浏览代码

HADOOP-4433. Improve data loader for collecting metrics and log files.
(Eric Yang via omalley)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@709533 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 16 年之前
父节点
当前提交
09188fdb66
共有 45 个文件被更改,包括 2329 次插入533 次删除
  1. 0 3
      CHANGES.txt
  2. 25 0
      src/contrib/chukwa/CHANGES.txt
  3. 1 1
      src/contrib/chukwa/bin/VERSION
  4. 35 0
      src/contrib/chukwa/bin/chukwa
  5. 51 50
      src/contrib/chukwa/bin/chukwa-config.sh
  6. 185 0
      src/contrib/chukwa/bin/chukwa-daemon.sh
  7. 38 0
      src/contrib/chukwa/bin/chukwa-daemons.sh
  8. 46 0
      src/contrib/chukwa/bin/nodeActivityDataLoader.sh
  9. 0 81
      src/contrib/chukwa/bin/shutdown.sh
  10. 68 0
      src/contrib/chukwa/bin/slaves.sh
  11. 33 0
      src/contrib/chukwa/bin/start-all.sh
  12. 31 0
      src/contrib/chukwa/bin/start-collectors.sh
  13. 39 0
      src/contrib/chukwa/bin/start-data-processors.sh
  14. 41 0
      src/contrib/chukwa/bin/start-probes.sh
  15. 0 54
      src/contrib/chukwa/bin/startup.sh
  16. 28 0
      src/contrib/chukwa/bin/stop-all.sh
  17. 26 0
      src/contrib/chukwa/bin/stop-collectors.sh
  18. 47 0
      src/contrib/chukwa/bin/stop-data-processors.sh
  19. 29 0
      src/contrib/chukwa/bin/stop-probes.sh
  20. 85 7
      src/contrib/chukwa/bin/systemDataLoader.sh
  21. 48 0
      src/contrib/chukwa/bin/torqueDataLoader.sh
  22. 95 60
      src/contrib/chukwa/bin/watchdog.sh
  23. 40 27
      src/contrib/chukwa/build.xml
  24. 1 0
      src/contrib/chukwa/conf/alert.conf.template
  25. 19 4
      src/contrib/chukwa/conf/chukwa-env.sh
  26. 19 4
      src/contrib/chukwa/conf/chukwa-env.sh.template
  27. 1 0
      src/contrib/chukwa/conf/chukwa-slaves.template
  28. 7 0
      src/contrib/chukwa/conf/hadoop-metrics.properties
  29. 1 0
      src/contrib/chukwa/conf/initial_adaptors.template
  30. 0 8
      src/contrib/chukwa/conf/nodeActivity.properties
  31. 0 8
      src/contrib/chukwa/conf/queueinfo.properties
  32. 0 8
      src/contrib/chukwa/conf/torque.properties
  33. 0 8
      src/contrib/chukwa/conf/util.properties
  34. 7 6
      src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/ChukwaTTInstru.java
  35. 146 82
      src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java
  36. 87 0
      src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java
  37. 64 0
      src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/ErStreamHandler.java
  38. 124 0
      src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueDataLoader.java
  39. 541 0
      src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java
  40. 50 0
      src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java
  41. 45 25
      src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java
  42. 34 0
      src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/ExecHelper.java
  43. 0 97
      src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Log4JMetricsContext.java
  44. 78 0
      src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
  45. 114 0
      src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/PidFile.java

+ 0 - 3
CHANGES.txt

@@ -46,9 +46,6 @@ Trunk (unreleased changes)
     HADOOP-4284. Support filters that apply to all requests, or global filters,
     to HttpServer. (Kan Zhang via cdouglas)
     
-    HADOOP-4431. Add versionning/tags to Chukwa Chunk. 
-    (Jerome Boulon via Johan)
-
     HADOOP-4276. Improve the hashing functions and deserialization of the 
     mapred ID classes. (omalley)
 

+ 25 - 0
src/contrib/chukwa/CHANGES.txt

@@ -0,0 +1,25 @@
+Trunk (unreleased changes)
+
+  INCOMPATIBLE CHANGES
+
+  NEW FEATURES
+
+  IMPROVEMENTS
+
+    HADOOP-4431. Add versionning/tags to Chukwa Chunk. 
+    (Jerome Boulon via Johan)
+
+    HADOOP-4433. Improve data loader for collecting metrics and log files.
+    (Eric Yang via omalley)
+
+  OPTIMIZATIONS
+
+  BUG FIXES
+
+Release 0.19.0 - Unreleased
+
+  NEW FEATURES
+
+    HADOOP-3719. Initial checkin of Chukwa, which is a data collection and 
+    analysis framework. (Jerome Boulon, Andy Konwinski, Ari Rabkin, 
+    and Eric Yang)

+ 1 - 1
src/contrib/chukwa/bin/VERSION

@@ -1 +1 @@
-0.0.1
+0.1.0

+ 35 - 0
src/contrib/chukwa/bin/chukwa

@@ -0,0 +1,35 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# The Chukwa command script
+#
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/chukwa-config.sh
+
+# get arguments
+COMMAND=$1
+shift
+
+if [ -f "${CHUKWA_CONF_DIR}/chukwa-env.sh" ]; then
+  . "${CHUKWA_CONF_DIR}/chukwa-env.sh"
+fi
+
+exec "${CHUKWA_HOME}/bin/$@"

+ 51 - 50
src/contrib/chukwa/bin/chukwa-config.sh

@@ -36,69 +36,70 @@ script=`basename "$this"`
 bin=`cd "$bin"; pwd`
 this="$bin/$script"
 
-. ${bin}/../conf/chukwa-env.sh
-
-export HADOOP_HOME="${HADOOP_HOME:-${bin}/../../../..}"
 
 # the root of the Chukwa installation
-if [ -z $CHUKWA_HOME ] ; then
-CHUKWA_HOME=`dirname "$this"`/..
-export CHUKWA_HOME=`cd $CHUKWA_HOME; pwd`
+export CHUKWA_HOME=`dirname "$this"`/..
+
+#check to see if the conf dir is given as an optional argument
+if [ $# -gt 1 ]
+then
+    if [ "--config" = "$1" ]
+          then
+              shift
+              confdir=$1
+              shift
+              CHUKWA_CONF_DIR=$confdir
+    fi
 fi
 
-chukwaVersion=`cat ${CHUKWA_HOME}/bin/VERSION`
-DEFAULT_CHUKWA_HOME=${CHUKWA_HOME}/logs/
-export CHUKWA_LOG_DIR="${CHUKWA_LOG_DIR:-$DEFAULT_CHUKWA_HOME}"
-if [ ! -d $CHUKWA_LOG_DIR ]; then
-  mkdir -p $CHUKWA_LOG_DIR
+#check to see it is specified whether to use the slaves or the
+# masters file
+if [ $# -gt 1 ]
+then
+    if [ "--hosts" = "$1" ]
+    then
+        shift
+        slavesfile=$1
+        shift
+        export CHUKWA_SLAVES="${CHUKWA_CONF_DIR}/$slavesfile"
+    fi
 fi
 
-export chuwaRecordsRepository="/chukwa/repos/demo"
+#check to see if the conf dir is given as an optional argument
+if [ $# -gt 1 ]
+then
+    if [ "--watchdog" = "$1" ]
+          then
+              shift
+              WATCHDOG="true"
+    fi
+fi
 
-export DATACONFIG=${CHUKWA_HOME}/conf/mdl.xml
-common=`ls ${CHUKWA_HOME}/lib/*.jar`
-export common=`echo ${common} | sed 'y/ /:/'`
+export CHUKWA_LOG_DIR="$CHUKWA_HOME/logs"
 
-#chukwaCore=${HADOOP_HOME}/build/contrib/chukwa/chukwa-core-${chukwaVersion}.jar
-chukwaCore=${HADOOP_HOME}/build/contrib/chukwa
-if [ -a $chukwaCore ] ; then
-  export chukwaCore
-else
-  echo ${chukwaCore} does not exist
-  export chukwaCore=${CHUKWA_HOME}/chukwa-core-${chukwaVersion}.jar
-fi
+CHUKWA_VERSION=`cat ${CHUKWA_HOME}/bin/VERSION`
 
-#chukwaAgent=${HADOOP_HOME}/build/contrib/chukwa/chukwa-agent-${chukwaVersion}.jar
-chukwaAgent=${HADOOP_HOME}/build/contrib/chukwa
-if [ -a $chukwaAgent ] ; then
-  export chukwaAgent
-else
-  echo ${chukwaAgent} does not exist
-  export chukwaAgent=${CHUKWA_HOME}/chukwa-agent-${chukwaVersion}.jar
+# Allow alternate conf dir location.
+if [ -z "$CHUKWA_CONF_DIR" ]; then
+    CHUKWA_CONF_DIR="${CHUKWA_CONF_DIR:-$CHUKWA_HOME/conf}"
+    export CHUKWA_CONF_DIR=${CHUKWA_HOME}/conf
 fi
 
-echo chukwaCore is ${chukwaCore} and chukwaAgent is ${chukwaAgent}
+if [ -f "${CHUKWA_CONF_DIR}/chukwa-env.sh" ]; then
+  . "${CHUKWA_CONF_DIR}/chukwa-env.sh"
+fi
 
+export DATACONFIG=${CHUKWA_CONF_DIR}/mdl.xml
+COMMON=`ls ${CHUKWA_HOME}/lib/*.jar`
+export COMMON=`echo ${COMMON} | sed 'y/ /:/'`
+export CHUKWA_CORE=${CHUKWA_HOME}/chukwa-core-${CHUKWA_VERSION}.jar
+export CHUKWA_AGENT=${CHUKWA_HOME}/chukwa-agent-${CHUKWA_VERSION}.jar
+export HADOOP_JAR=`ls ${HADOOP_HOME}/hadoop-*-core.jar`
 export CURRENT_DATE=`date +%Y%m%d%H%M`
-export TS_CONFIG=${CHUKWA_HOME}/conf/ts
-export tomcat=${CHUKWA_HOME}/opt/apache-tomcat-6.0.16
-if [ -d ${HADOOP_HOME}/build/classes ]; then
-  DEFAULT_HADOOP_JAR=${HADOOP_HOME}/build/classes
-# this doesn't work, but needs to be replaced with something that does
-#elif [ls ${HADOOP_HOME}/build/hadoop-*-core.jar` ]; then
-#  echo setting DEFAULT_HADOOP_JAR to `ls ${HADOOP_HOME}/build/hadoop-*-core.jar`
-#  DEFAULT_HADOOP_JAR=`ls ${HADOOP_HOME}/build/hadoop-*-core.jar`
-else
-  DEFAULT_HADOOP_JAR=${CHUKWA_HOME}/hadoopjars/hadoop-0.18.0-core.jar
-fi
-export HADOOP_JAR=${HADOOP_JAR:-$DEFAULT_HADOOP_JAR}
 
-echo
-echo HADOOP_JAR is $HADOOP_JAR
-echo
+if [ -z "$JAVA_HOME" ] ; then
+    export JAVA_HOME=/usr/lib/j2sdk1.5-sun
+fi
 
-export CHUKWA_LOG_DIR="${CHUKWA_HOME}/logs/"
-DEFAULT_PID_DIR=${CHUKWA_HOME}/var/run
-export CHUKWA_PID_DIR="${CHUKWA_PID_DIR:-$DEFAULT_PID_DIR}"
-export chuwaRecordsRepository="/chukwa/repos/demo"
+export JPS=${JAVA_HOME}/bin/jps
 

+ 185 - 0
src/contrib/chukwa/bin/chukwa-daemon.sh

@@ -0,0 +1,185 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Runs a Chukwa command as a daemon.
+#
+# Environment Variables
+#
+#   CHUKWA_CONF_DIR  Alternate conf dir. Default is ${CHUKWA_HOME}/conf.
+#   CHUKWA_LOG_DIR   Where log files are stored.  PWD by default.
+#   CHUKWA_MASTER    host:path where chukwa code should be rsync'd from
+#   CHUKWA_PID_DIR   The pid files are stored. ${CHUKWA_HOME}/var/tmp by default.
+#   CHUKWA_IDENT_STRING   A string representing this instance of chukwa. $USER by default
+#   CHUKWA_NICENESS The scheduling priority for daemons. Defaults to 0.
+##
+
+usage="Usage: chukwa-daemon.sh [--config <conf-dir>] [--hosts hostlistfile] (start|stop) <chukwa-command> <args...>"
+
+# if no args specified, show usage
+if [ $# -le 1 ]; then
+  echo $usage
+  exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/chukwa-config.sh
+
+# get arguments
+startStop=$1
+shift
+command=$1
+shift
+
+chukwa_rotate_log ()
+{
+    log=$1;
+    num=5;
+    if [ -n "$2" ]; then
+	num=$2
+    fi
+    if [ -f "$log" ]; then # rotate logs
+	while [ $num -gt 1 ]; do
+	    prev=`expr $num - 1`
+	    [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"
+	    num=$prev
+	done
+	mv "$log" "$log.$num";
+    fi
+}
+
+if [ -f "${CHUKWA_CONF_DIR}/chukwa-env.sh" ]; then
+  . "${CHUKWA_CONF_DIR}/chukwa-env.sh"
+fi
+
+# get log directory
+if [ "$CHUKWA_LOG_DIR" = "" ]; then
+  export CHUKWA_LOG_DIR="$CHUKWA_HOME/logs"
+fi
+mkdir -p "$CHUKWA_LOG_DIR"
+
+if [ "$CHUKWA_PID_DIR" = "" ]; then
+  CHUKWA_PID_DIR=$CHUKWA_HOME/var/run
+fi
+
+if [ "$CHUKWA_IDENT_STRING" = "" ]; then
+  export CHUKWA_IDENT_STRING="$USER"
+fi
+
+# some variables
+export CHUKWA_LOGFILE=chukwa-$CHUKWA_IDENT_STRING-$command-$HOSTNAME.log
+export CHUKWA_ROOT_LOGGER="INFO,DRFA"
+log=$CHUKWA_LOG_DIR/chukwa-$CHUKWA_IDENT_STRING-$command-$HOSTNAME.out
+pid=$CHUKWA_PID_DIR/chukwa-$CHUKWA_IDENT_STRING-$command.pid
+
+# Set default scheduling priority
+if [ "$CHUKWA_NICENESS" = "" ]; then
+    export CHUKWA_NICENESS=0
+fi
+
+case $startStop in
+
+  (start)
+    MAIL=`cat ${CHUKWA_HOME}/conf/alert.conf`
+
+    if [ "${WATCHDOG}" != "" ]; then
+        mkdir -p ${CHUKWA_HOME}/var/tmp >&/dev/null
+        crontab -l > ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}
+        crontest=$?
+
+        if [ "X${crontest}" != "X0" ]; then
+          cat > ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE} << CRON
+MAILTO=${MAIL}
+1,30 * * * * ${CHUKWA_HOME}/bin/watchdog.sh
+CRON
+        else
+          grep -v "${CHUKWA_HOME}/bin/watchdog.sh" ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE} | grep -v MAILTO > ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}.2
+          echo "MAILTO=${MAIL}" > ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}
+          cat ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}.2 >> ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}
+          rm -f ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}.2
+          cat >> ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE} << CRON
+1,30 * * * * ${CHUKWA_HOME}/bin/watchdog.sh
+CRON
+        fi
+
+        # save crontab
+        echo -n "Registering watchdog.."
+        mkdir -p ${CHUKWA_HOME}/var/tmp >&/dev/null
+        crontab ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE} > /dev/null 2>&1
+        rm -f ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}
+        echo "done"
+    fi
+
+    mkdir -p "$CHUKWA_PID_DIR"
+
+    if [ -f $pid ]; then
+      if kill -0 `cat $pid` > /dev/null 2>&1; then
+        echo $command running as process `cat $pid`.  Stop it first.
+        exit 1
+      fi
+    fi
+
+    if [ "$CHUKWA_MASTER" != "" ]; then
+      echo rsync from $CHUKWA_MASTER
+      rsync -a -e ssh --delete --exclude=.svn $CHUKWA_MASTER/ "$CHUKWA_HOME"
+    fi
+
+    chukwa_rotate_log $log
+    echo starting $command, logging to $log
+    cd "$CHUKWA_HOME"
+    nohup nice -n $CHUKWA_NICENESS "$CHUKWA_HOME"/bin/chukwa -config $command "$@" > "$log" 2>&1 < /dev/null &
+    echo $! > $pid
+    sleep 1; head "$log"
+    ;;
+          
+  (stop)
+
+    if [ "${WATCHDOG}" != "" ]; then
+        # remove watchdog
+        crontab -l | grep -v ${CHUKWA_HOME}/bin/watchdog.sh > ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}
+        crontab ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}
+        rm -f ${CHUKWA_HOME}/var/tmp/cron.${CURRENT_DATE}
+    fi
+
+    if [ -f $CHUKWA_HOME/bin/$command ]; then
+      $CHUKWA_HOME/bin/$command stop
+      rm -f $pid
+    else
+      if [ -f $pid ]; then
+        if kill -0 `cat $pid` > /dev/null 2>&1; then
+          echo stopping $command
+          kill `cat $pid`
+          rm -f $pid
+        else
+          echo no $command to stop
+        fi
+      else
+        echo no $command to stop
+      fi
+    fi
+    ;;
+
+  (*)
+    echo $usage
+    exit 1
+    ;;
+
+esac
+
+

+ 38 - 0
src/contrib/chukwa/bin/chukwa-daemons.sh

@@ -0,0 +1,38 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Run a Hadoop command on all slave hosts.
+
+usage="Usage: chukwa-daemons.sh [--config confdir] [--hosts hostlistfile] [start|stop] command args..."
+
+# if no args specified, show usage
+if [ $# -le 1 ]; then
+  echo $usage
+  exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. $bin/chukwa-config.sh
+
+if [ ${WATCHDOG}!="" ]; then
+  exec "$bin/slaves.sh" --config $CHUKWA_CONF_DIR cd "$CHUKWA_HOME" \; "$bin/chukwa-daemon.sh" --config $CHUKWA_CONF_DIR --watchdog "$@"
+else
+  exec "$bin/slaves.sh" --config $CHUKWA_CONF_DIR cd "$CHUKWA_HOME" \; "$bin/chukwa-daemon.sh" --config $CHUKWA_CONF_DIR "$@"
+fi

+ 46 - 0
src/contrib/chukwa/bin/nodeActivityDataLoader.sh

@@ -0,0 +1,46 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/chukwa-config.sh
+
+if [ "X$1" = "Xstop" ]; then
+  echo -n "Shutting down Node Activity Data Loader..."
+  if [ -f ${CHUKWA_HOME}/var/run/PbsNodes-data-loader.pid ]; then
+    kill -TERM `cat ${CHUKWA_HOME}/var/run/PbsNodes-data-loader.pid`
+  fi
+  echo "done"
+  exit 0
+fi
+
+EXISTS=0
+pidFile="${CHUKWA_HOME}/var/run/PbsNodes-data-loader.pid"
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`${JPS} | grep ${pid} | grep Exec | grep -v grep | wc -l`
+  if [ $ChildPIDRunningStatus -ge 1 ]; then
+    EXISTS=1
+  fi
+fi
+
+if [ ${EXISTS} -lt 1 ]; then
+    ${JAVA_HOME}/bin/java -DPERIOD=600 -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DRECORD_TYPE=PbsNodes -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec "${nodeActivityCmde}" &
+fi
+
+

+ 0 - 81
src/contrib/chukwa/bin/shutdown.sh

@@ -1,81 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-java=$JAVA_HOME/bin/java
-
-. "$bin"/chukwa-config.sh
-
-# remove watchdog
-crontab -l | grep -v ${CHUKWA_HOME}/bin/watchdog.sh > /tmp/cron.${CURRENT_DATE}
-crontab /tmp/cron.${CURRENT_DATE}
-rm -f /tmp/cron.${CURRENT_DATE}
-
-# stop torque data loader
-pidFile=$CHUKWA_HOME/var/run/TorqueDataLoader.pid
-if [ -f $pidFile ]; then  
-   echo -n "Shutting down Torque Data Loader.."
-   torquepid=`head ${pidFile}`
-   kill -HUP ${torquepid}
-   # kill -HUP `ps eww |grep TorqueDataLoader |grep -v grep |cut -b 1-5` >/dev/null 2>&1
-   rm ${pidFile}
-   echo "done"
-else
-  echo " no $pidFile"
-fi
-
-# stop util data loader
-pidFile=$CHUKWA_HOME/var/run/UtilDataLoader.pid
-if [ -f $pidFile ]; then  
-    echo -n "Shutting down Util Data Loader.."
-    utilpid=`head ${pidFile}`
-    #kill -HUP `ps eww |grep UtilDataLoader |grep -v grep |cut -b 1-5` >/dev/null 2>&1
-    kill -HUP ${utilpid}
-    rm ${pidFile}
-    echo "done"
-else
-  echo " no $pidFile"
-fi
-
-# stop queue info data loader
-pidFile=$CHUKWA_HOME/var/run/QueueInfoDataLoader.pid
-if [ -f $pidFile ]; then  
-    echo -n "Shutting down Queue Info Data Loader.."
-    queuepid=`head ${pidFile}`
-    #kill -HUP `ps eww |grep QueueInfoDataLoader |grep -v grep |cut -b 1-5` >/dev/null 2>&1
-    kill -HUP ${queuepid}
-    rm ${pidFile}
-    echo "done"
-else 
-  echo " no $pidFile"
-fi
-
-
-# stop queue info data loader
-pidFile=$CHUKWA_HOME/var/run/MapReduceLogLoader.pid
-if [ -f $pidFile ]; then  
-    echo -n "Shutting down Map Reduce Log Loader.."
-    logpid=`head ${pidFile}`
-    #kill -HUP `ps eww |grep MapReduceLogLoader |grep -v grep |cut -b 1-5` >/dev/null 2>&1
-    kill -HUP ${logpid}
-    rm ${pidFile}
-    echo "done"
-else
-  echo " no $pidFile"
-fi
- 

+ 68 - 0
src/contrib/chukwa/bin/slaves.sh

@@ -0,0 +1,68 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Run a shell command on all slave hosts.
+#
+# Environment Variables
+#
+#   CHUKWA_SLAVES    File naming remote hosts.
+#     Default is ${CHUKWA_CONF_DIR}/chukwa-slaves.
+#   CHUKWA_CONF_DIR  Alternate conf dir. Default is ${CHUKWA_HOME}/conf.
+#   CHUKWA_SLAVE_SLEEP Seconds to sleep between spawning remote commands.
+#   CHUKWA_SSH_OPTS Options passed to ssh when running remote commands.
+##
+
+usage="Usage: slaves.sh [--config confdir] command..."
+
+# if no args specified, show usage
+if [ $# -le 0 ]; then
+  echo $usage
+  exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/chukwa-config.sh
+
+# If the slaves file is specified in the command line,
+# then it takes precedence over the definition in 
+# hadoop-env.sh. Save it here.
+HOSTLIST=$CHUKWA_SLAVES
+
+if [ -f "${CHUKWA_CONF_DIR}/chukwa-env.sh" ]; then
+  . "${CHUKWA_CONF_DIR}/chukwa-env.sh"
+fi
+
+if [ "$HOSTLIST" = "" ]; then
+  if [ "$CHUKWA_SLAVES" = "" ]; then
+    export HOSTLIST="${CHUKWA_CONF_DIR}/chukwa-slaves"
+  else
+    export HOSTLIST="${CHUKWA_SLAVES}"
+  fi
+fi
+
+for slave in `cat "$HOSTLIST"`; do
+ ssh $CHUKWA_SSH_OPTS $slave $"${@// /\\ }" \
+   2>&1 | sed "s/^/$slave: /" &
+ if [ "$CHUKWA_SLAVE_SLEEP" != "" ]; then
+   sleep $CHUKWA_SLAVE_SLEEP
+ fi
+done
+
+wait

+ 33 - 0
src/contrib/chukwa/bin/start-all.sh

@@ -0,0 +1,33 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Start all chukwa daemons.  Run this on master node.
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/chukwa-config.sh
+
+# start probes
+"$bin"/start-probes.sh --config $CHUKWA_CONF_DIR
+
+# start data processors
+"$bin"/start-data-processors.sh --config $CHUKWA_CONF_DIR
+
+# start collectors
+"$bin"/start-collectors.sh --config $CHUKWA_CONF_DIR

+ 31 - 0
src/contrib/chukwa/bin/start-collectors.sh

@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Start hadoop dfs daemons.
+# Optinally upgrade or rollback dfs state.
+# Run this on master node.
+
+usage="Usage: start-collectors.sh"
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/chukwa-config.sh
+
+# start jetty collectors
+"$bin"/chukwa-daemons.sh --config $CHUKWA_CONF_DIR --hosts collectors --watchdog start jettyCollector.sh

+ 39 - 0
src/contrib/chukwa/bin/start-data-processors.sh

@@ -0,0 +1,39 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+java=$JAVA_HOME/bin/java
+
+. "$bin"/chukwa-config.sh
+if [ ! -d ${CHUKWA_HOME}/opt/apache-tomcat-6.0.16 ]; then
+  if [ -f ${CHUKWA_HOME}/opt/apache-tomcat-6.0.16.tar.gz ]; then
+    tar fxz ${CHUKWA_HOME}/opt/apache-tomcat-6.0.16.tar.gz -C ${CHUKWA_HOME}/opt
+  fi
+fi
+
+if [ ! -f ${CHUKWA_HOME}/opt/apache-tomcat-6.0.16/webapps/hicc-${CHUKWA_VERSION}.war ]; then
+  if [ -f ${CHUKWA_HOME}/hicc-${CHUKWA_VERSION}.war ]; then
+    cp ${CHUKWA_HOME}/hicc-${CHUKWA_VERSION}.war ${CHUKWA_HOME}/opt/apache-tomcat-6.0.16/webapps
+  fi
+fi 
+
+# start data processors
+"$bin"/chukwa-daemon.sh --config $CHUKWA_CONF_DIR --watchdog start processSinkFiles.sh watchdog
+
+# start database admin script
+"$bin"/chukwa-daemon.sh --config $CHUKWA_CONF_DIR start dbAdmin.sh

+ 41 - 0
src/contrib/chukwa/bin/start-probes.sh

@@ -0,0 +1,41 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Start hadoop dfs daemons.
+# Optinally upgrade or rollback dfs state.
+# Run this on master node.
+
+usage="Usage: start-probes.sh"
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/chukwa-config.sh
+
+# start agents
+"$bin"/chukwa-daemons.sh --config $CHUKWA_CONF_DIR --watchdog start agent.sh
+# start system data loader daemons
+"$bin"/chukwa-daemons.sh --config $CHUKWA_CONF_DIR start systemDataLoader.sh
+
+# start torque data loader daemons
+if [ ${TORQUE_HOME} != "" ]; then
+  "$bin"/chukwa-daemon.sh --config $CHUKWA_CONF_DIR start torqueDataLoader.sh
+fi
+if [ ${nodeActivityCmde} != "" ]; then
+  "$bin"/chukwa-daemon.sh --config $CHUKWA_CONF_DIR start nodeActivityDataLoader.sh
+fi

+ 0 - 54
src/contrib/chukwa/bin/startup.sh

@@ -1,54 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-java=$JAVA_HOME/bin/java
-
-. "$bin"/chukwa-config.sh
-crontab -l > /tmp/cron.${CURRENT_DATE}
-crontest=$?
-
-if [ ! -d ${CHUKWA_HOME}/opt/apache-tomcat-6.0.16 ]; then
-  tar fxz ${CHUKWA_HOME}/opt/apache-tomcat-6.0.16.tar.gz -C ${CHUKWA_HOME}/opt
-fi
-
-if [ ! -f ${CHUKWA_HOME}/opt/apache-tomcat-6.0.16/webapps/ts.war ]; then
-  cp ${CHUKWA_HOME}/ts.war ${CHUKWA_HOME}/opt/apache-tomcat-6.0.16/webapps
-fi 
-
-if [ "X$?" != "X0" ]; then
-  cat > /tmp/cron.${CURRENT_DATE} << CRON
-* * * * * ${CHUKWA_HOME}/bin/watchdog.sh
-10 * * * * ${CHUKWA_HOME}/tools/expire.sh 3 /grid/0/tmp/mdl
-CRON
-else
-  grep -v "${CHUKWA_HOME}/bin/watchdog.sh" /tmp/cron.${CURRENT_DATE} | \
-  grep -v "${CHUKWA_HOME}/tools/expire.sh 3 /grid/0/tmp/mdl" > /tmp/cron.${CURRENT_DATE}.2
-  mv /tmp/cron.${CURRENT_DATE}.2 /tmp/cron.${CURRENT_DATE}
-  cat >> /tmp/cron.${CURRENT_DATE} << CRON
-* * * * * ${CHUKWA_HOME}/bin/watchdog.sh
-10 * * * * ${CHUKWA_HOME}/tools/expire.sh 3 /grid/0/tmp/mdl
-CRON
-fi
-
-# save crontab
-echo -n "Registering data loader cron jobs.."
-crontab /tmp/cron.${CURRENT_DATE} > /dev/null 2>&1
-rm -f /tmp/cron.${CURRENT_DATE}
-echo "done"
-

+ 28 - 0
src/contrib/chukwa/bin/stop-all.sh

@@ -0,0 +1,28 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Stop all chukwa daemons.  Run this on master node.
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/chukwa-config.sh
+
+"$bin"/stop-probes.sh --config $CHUKWA_CONF_DIR
+"$bin"/stop-data-processors.sh --config $CHUKWA_CONF_DIR
+"$bin"/stop-collectors.sh --config $CHUKWA_CONF_DIR

+ 26 - 0
src/contrib/chukwa/bin/stop-collectors.sh

@@ -0,0 +1,26 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Stop collectors.  Run this on master node.
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/chukwa-config.sh
+
+"$bin"/chukwa-daemons.sh --config $CHUKWA_CONF_DIR --hosts collectors --watchdog stop jettyCollector.sh

+ 47 - 0
src/contrib/chukwa/bin/stop-data-processors.sh

@@ -0,0 +1,47 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+java=$JAVA_HOME/bin/java
+
+. "$bin"/chukwa-config.sh
+
+# stop processSinkFiles.sh
+pidFile=$CHUKWA_HOME/var/run/ProcessSinkFiles.pid
+if [ -f $pidFile ]; then  
+   echo -n "Shutting down Data Processors.."
+   DP_PID=`head ${pidFile}`
+   kill -TERM ${DP_PID}
+   rm ${pidFile}
+   echo "done"
+else
+  echo " no $pidFile"
+fi
+
+# stop dbAdmin.sh
+pidFile=$CHUKWA_HOME/var/run/dbAdmin.pid
+if [ -f $pidFile ]; then  
+   echo -n "Shutting down Database Admin.."
+   DBADMIN_PID=`head ${pidFile}`
+   kill -TERM ${DBADMIN_PID}
+   rm ${pidFile}
+   echo "done"
+else
+  echo " no $pidFile"
+fi
+

+ 29 - 0
src/contrib/chukwa/bin/stop-probes.sh

@@ -0,0 +1,29 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Stop probes.  Run this on master node.
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/chukwa-config.sh
+
+"$bin"/chukwa-daemons.sh --config $CHUKWA_CONF_DIR stop systemDataLoader.sh
+"$bin"/chukwa-daemon.sh --config $CHUKWA_CONF_DIR stop torqueDataLoader.sh
+"$bin"/chukwa-daemon.sh --config $CHUKWA_CONF_DIR stop nodeActivityDataLoader.sh
+"$bin"/chukwa-daemons.sh --config $CHUKWA_CONF_DIR --watchdog stop agent.sh

+ 85 - 7
src/contrib/chukwa/bin/systemDataLoader.sh

@@ -1,3 +1,6 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
 # The ASF licenses this file to You under the Apache License, Version 2.0
@@ -12,17 +15,92 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-pid=$$
-
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
 
 . "$bin"/chukwa-config.sh
 
-echo "${pid}" > "$CHUKWA_HOME/var/run/systemDataLoader.pid"
+JVM_OPTS="-Xms4M -Xmx4M"
+
+if [ "X$1" = "Xstop" ]; then
+  echo -n "Shutting down System Data Loader..."
+  if [ -f ${CHUKWA_HOME}/var/run/Sar-data-loader.pid ]; then
+    kill -TERM `cat ${CHUKWA_HOME}/var/run/Sar-data-loader.pid`
+  fi
+  if [ -f ${CHUKWA_HOME}/var/run/Iostat-data-loader.pid ]; then
+    kill -TERM `cat ${CHUKWA_HOME}/var/run/Iostat-data-loader.pid`
+  fi
+  if [ -f ${CHUKWA_HOME}/var/run/Top-data-loader.pid ]; then
+    kill -TERM `cat ${CHUKWA_HOME}/var/run/Top-data-loader.pid`
+  fi
+  if [ -f ${CHUKWA_HOME}/var/run/Df-data-loader.pid ]; then
+    kill -TERM `cat ${CHUKWA_HOME}/var/run/Df-data-loader.pid`
+  fi
+  echo "done"
+  exit 0
+fi
+
+echo -n "Starting System Data Loader..."
+
+#test=`grep -q SysLog ${CHUKWA_HOME}/var/chukwa_checkpoint*`
+#if [ "X${test}"="X1" ]; then
+#  echo "add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped SysLog 0 /var/log/messages 0" | nc localhost 9093 >&/dev/null & disown -h 
+#fi
+
+EXISTS=0
+pidFile="${CHUKWA_HOME}/var/run/Sar-data-loader.pid"
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`${JPS} | grep ${pid} | grep Exec | grep -v grep | wc -l`
+  if [ $ChildPIDRunningStatus -ge 1 ]; then
+    EXISTS=1
+  fi
+fi
+
+if [ ${EXISTS} -lt 1 ]; then
+    ${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DRECORD_TYPE=Sar -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec sar -q -r -n FULL 55 &
+fi
+
+EXISTS=0
+pidFile="${CHUKWA_HOME}/var/run/Iostat-data-loader.pid"
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`${JPS} | grep ${pid} | grep Exec | grep -v grep | wc -l`
+  if [ $ChildPIDRunningStatus -ge 1 ]; then
+    EXISTS=1
+  fi
+fi
+
+if [ ${EXISTS} -lt 1 ]; then
+  ${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DRECORD_TYPE=Iostat -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec iostat -x 55 2 &
+fi
+
+EXISTS=0
+pidFile="${CHUKWA_HOME}/var/run/Top-data-loader.pid"
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`${JPS} | grep ${pid} | grep Exec | grep -v grep | wc -l`
+  if [ $ChildPIDRunningStatus -ge 1 ]; then
+    EXISTS=1
+  fi
+fi
+
+if [ ${EXISTS} -lt 1 ]; then
+  ${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DRECORD_TYPE=Top -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec top -b -n 1 -c &
+fi
+
+EXISTS=0
+pidFile="${CHUKWA_HOME}/var/run/Df-data-loader.pid"
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`${JPS} | grep ${pid} | grep Exec | grep -v grep | wc -l`
+  if [ $ChildPIDRunningStatus -ge 1 ]; then
+    EXISTS=1
+  fi
+fi
 
-${JAVA_HOME}/bin/java -DCHUKWA_HOME=${CHUKWA_HOME} -DRECORD_TYPE=Sar -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${chukwaCore}:${hadoop_jar}:${common}:${tools}:${CHUKWA_HOME}/conf org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec sar -q -r -n FULL 55 &
-${JAVA_HOME}/bin/java -DCHUKWA_HOME=${CHUKWA_HOME} -DRECORD_TYPE=Iostat -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${chukwaCore}:${hadoop_jar}:${common}:${tools}:${CHUKWA_HOME}/conf org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec iostat -x 55 2 &
-${JAVA_HOME}/bin/java -DCHUKWA_HOME=${CHUKWA_HOME} -DRECORD_TYPE=Top -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${chukwaCore}:${hadoop_jar}:${common}:${tools}:${CHUKWA_HOME}/conf org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec top -b -n 1 -c &
-#${JAVA_HOME}/bin/java -DRECORD_TYPE=Df -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${chukwaAgent}:${hadoop_jar}:${common}:${tools}:${CHUKWA_HOME}/conf org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec df -x nfs -x none &
+if [ ${EXISTS} -lt 1 ]; then
+  ${JAVA_HOME}/bin/java $JVM_OPTS -DPERIOD=60 -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DRECORD_TYPE=Df -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${HADOOP_JAR}:${COMMON}:${TOOLS}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.inputtools.plugin.metrics.Exec df -l &
+fi
 
+echo "done"

+ 48 - 0
src/contrib/chukwa/bin/torqueDataLoader.sh

@@ -0,0 +1,48 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/chukwa-config.sh
+
+java=$JAVA_HOME/bin/java
+
+if [ "X$1" = "Xstop" ]; then
+  echo -n "Shutting down Torque Data Loader..."
+  if [ -f ${CHUKWA_HOME}/var/run/TorqueDataLoader.pid ]; then
+    kill -TERM `cat ${CHUKWA_HOME}/var/run/TorqueDataLoader.pid`
+    rm -f ${CHUKWA_HOME}/var/run/TorqueDataLoader.pid
+  fi
+  echo "done"
+  exit 0
+fi
+
+min=`date +%M`
+
+
+# start torque data loader
+pidFile=$CHUKWA_HOME/var/run/TorqueDataLoader.pid
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`${JPS} | grep ${pid} | grep TorqueDataLoader | grep -v grep | wc -l`
+  if [ $ChildPIDRunningStatus -lt 1 ]; then
+      ${java} -DDOMAIN=${DOMAIN} -DTORQUE_SERVER=${TORQUE_SERVER} -DTORQUE_HOME=${TORQUE_HOME} -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DRECORD_TYPE=Torque -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${COMMON}:${HADOOP_JAR}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.inputtools.mdl.TorqueDataLoader&
+  fi 
+else
+      ${java} -DDOMAIN=${DOMAIN} -DTORQUE_SERVER=${TORQUE_SERVER} -DTORQUE_HOME=${TORQUE_HOME} -DCHUKWA_HOME=${CHUKWA_HOME} -DCHUKWA_CONF_DIR=${CHUKWA_CONF_DIR} -DRECORD_TYPE=Torque -Dlog4j.configuration=system-data-loader.properties -classpath ${CLASSPATH}:${CHUKWA_CORE}:${COMMON}:${HADOOP_JAR}:${CHUKWA_CONF_DIR} org.apache.hadoop.chukwa.inputtools.mdl.TorqueDataLoader&
+fi

+ 95 - 60
src/contrib/chukwa/bin/watchdog.sh

@@ -21,92 +21,127 @@ bin=`cd "$bin"; pwd`
 . "$bin"/chukwa-config.sh
 
 java=$JAVA_HOME/bin/java
-jps=$JAVA_HOME/bin/jps
 
 
 min=`date +%M`
 
+if [ "$CHUKWA_IDENT_STRING" = "" ]; then
+  export CHUKWA_IDENT_STRING="$USER"
+fi
 
-# start torque data loader
-pidFile=$CHUKWA_HOME/var/run/TorqueDataLoader.pid
+# monitor agent
+pidFile=$CHUKWA_HOME/var/run/chukwa-$CHUKWA_IDENT_STRING-agent.sh.pid
 if [ -f $pidFile ]; then
   pid=`head ${pidFile}`
-  ChildPIDRunningStatus=`${jps} | grep ${pid} | grep TorqueDataLoader | grep -v grep | wc -l`
-  #ChildPIDRunningStatus=`ps -ef | grep TorqueDataLoader | grep -v grep | wc -l`
+  ChildPIDRunningStatus=`ps ax | grep ${pid} | grep agent.sh | grep -v grep | wc -l`
   if [ $ChildPIDRunningStatus -lt 1 ]; then
-      ${java} -DCHUKWA_HOME=${CHUKWA_HOME} -Dlog4j.configuration=torque.properties -classpath ${CLASSPATH}:${chukwa}:${ikit}:${common} org.apache.hadoop.chukwa.sources.mdl.TorqueDataLoader&
+      HOSTNAME=`hostname`
+      echo "${HOSTNAME}: agent pid file exists, but process missing.  Restarting agent.sh."
+      "$bin/chukwa-daemon.sh" --config $CHUKWA_CONF_DIR start agent.sh &
   fi 
-else
-      ${java} -DCHUKWA_HOME=${CHUKWA_HOME} -Dlog4j.configuration=torque.properties -classpath ${CLASSPATH}:${chukwa}:${ikit}:${common} org.apache.hadoop.chukwa.sources.mdl.TorqueDataLoader&
 fi
-# start util data loader
-pidFile=$CHUKWA_HOME/var/run/UtilDataLoader.pid
+
+# monitor collector
+pidFile=$CHUKWA_HOME/var/run/chukwa-$CHUKWA_IDENT_STRING-jettyCollector.sh.pid
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`ps ax | grep ${pid} | grep jettyCollector.sh | grep -v grep | wc -l`
+  if [ $ChildPIDRunningStatus -lt 1 ]; then
+      HOSTNAME=`hostname`
+      echo "${HOSTNAME}: collector pid file exists, but process missing.  Restarting jettyCollector.sh."
+      "$bin/chukwa-daemon.sh" --config $CHUKWA_CONF_DIR start jettyCollector.sh &
+  fi
+fi
+
+# monitor node activity data loader
+pidFile=$CHUKWA_HOME/var/run/PbsNodes-data-loader.pid
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`${JPS} | grep ^${pid} | grep -v grep | wc -l`
+  if [ $ChildPIDRunningStatus -lt 1 ]; then
+      HOSTNAME=`hostname`
+      echo "${HOSTNAME}: PbsNodes-data-loader pid file exists, but process missing.  Restarting nodeActivityDataLoader.sh."
+      "$bin/chukwa-daemon.sh" --config $CHUKWA_CONF_DIR start nodeActivityDataLoader.sh &
+  fi
+fi
+
+# monitor system data loader
+pidFile=$CHUKWA_HOME/var/run/Df-data-loader.pid
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`${JPS} | grep ^${pid} | grep -v grep | wc -l`
+  if [ $ChildPIDRunningStatus -lt 1 ]; then
+      HOSTNAME=`hostname`
+      echo "${HOSTNAME}: Df-data-loader pid file exists, but process missing.  Restarting systemDataLoader.sh."
+      "$bin/chukwa-daemon.sh" --config $CHUKWA_CONF_DIR start systemDataLoader.sh &
+  fi
+fi
+
+pidFile=$CHUKWA_HOME/var/run/Iostat-data-loader.pid
 if [ -f $pidFile ]; then
   pid=`head ${pidFile}`
-  ChildPIDRunningStatus=`${jps} | grep ${pid} | grep UtilDataLoader | grep -v grep | wc -l`
-  #ChildPIDRunningStatus=`ps -ef | grep UtilDataLoader | grep -v grep | wc -l`
+  ChildPIDRunningStatus=`${JPS} | grep ^${pid} | grep -v grep | wc -l`
   if [ $ChildPIDRunningStatus -lt 1 ]; then
-      ${java} -DCHUKWA_HOME=${CHUKWA_HOME} -Dlog4j.configuration=util.properties -classpath ${CLASSPATH}:${chukwa}:${ikit}:${common} org.apache.hadoop.chukwa.sources.mdl.UtilDataLoader&
+      HOSTNAME=`hostname`
+      echo "${HOSTNAME}: Iostat-data-loader pid file exists, but process missing.  Restarting systemDataLoader.sh."
+      "$bin/chukwa-daemon.sh" --config $CHUKWA_CONF_DIR start systemDataLoader.sh &
   fi
-else 
-      ${java} -DCHUKWA_HOME=${CHUKWA_HOME} -Dlog4j.configuration=util.properties -classpath ${CLASSPATH}:${chukwa}:${ikit}:${common} org.apache.hadoop.chukwa.sources.mdl.UtilDataLoader&
 fi
 
-# start queue info data loader
-pidFile=$CHUKWA_HOME/var/run/QueueInfoDataLoader.pid
+pidFile=$CHUKWA_HOME/var/run/Sar-data-loader.pid
 if [ -f $pidFile ]; then
   pid=`head ${pidFile}`
-  ChildPIDRunningStatus=`${jps} | grep ${pid} | grep QueueInfoDataLoader | grep -v grep | wc -l`
-  #ChildPIDRunningStatus=`ps -ef | grep QueueInfoDataLoader | grep -v grep | wc -l`
+  ChildPIDRunningStatus=`${JPS} | grep ^${pid} | grep -v grep | wc -l`
   if [ $ChildPIDRunningStatus -lt 1 ]; then
-      ${java} -DCHUKWA_HOME=${CHUKWA_HOME} -Dlog4j.configuration=queueinfo.properties -classpath ${CLASSPATH}:${chukwa}:${ikit}:${common} org.apache.hadoop.chukwa.sources.mdl.QueueInfoDataLoader&
+      HOSTNAME=`hostname`
+      echo "${HOSTNAME}: Sar-data-loader pid file exists, but process missing.  Restarting systemDataLoader.sh."
+      "$bin/chukwa-daemon.sh" --config $CHUKWA_CONF_DIR start systemDataLoader.sh &
   fi
-else
-      ${java} -DCHUKWA_HOME=${CHUKWA_HOME} -Dlog4j.configuration=queueinfo.properties -classpath ${CLASSPATH}:${chukwa}:${ikit}:${common} org.apache.hadoop.chukwa.sources.mdl.QueueInfoDataLoader&
 fi
 
-# start map reduce log data loader
-tenmin=`echo ${min} | cut -b 2-`
-if [ "X${tenmin}" == "X0" ]; then
-    pidFile=$CHUKWA_HOME/var/run/JobLogDataLoader.pid
-    if [ -f $pidFile ]; then
-        pid=`head ${pidFile}`
-        ChildPIDRunningStatus=`${jps} | grep ${pid} | grep JobLogDataLoader |  wc -l`
-        if [ $ChildPIDRunningStatus -lt 1 ]; then
-            ${java} -Xms128m -Xmx1280m -DCHUKWA_HOME=${CHUKWA_HOME} -Dlog4j.configuration=joblog.properties -classpath ${CLASSPATH}:${chukwa}:${ikit}:${common} org.apache.hadoop.chukwa.sources.mdl.JobLogDataLoader &
-        fi
-    else
-        ${java} -Xms128m -Xmx1280m -DCHUKWA_HOME=${CHUKWA_HOME} -Dlog4j.configuration=joblog.properties -classpath ${CLASSPATH}:${chukwa}:${ikit}:${common} org.apache.hadoop.chukwa.sources.mdl.JobLogDataLoader &
-    fi
+pidFile=$CHUKWA_HOME/var/run/Top-data-loader.pid
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`${JPS} | grep ^${pid} | grep -v grep | wc -l`
+  if [ $ChildPIDRunningStatus -lt 1 ]; then
+      HOSTNAME=`hostname`
+      echo "${HOSTNAME}: Top-data-loader pid file exists, but process missing.  Restarting systemDataLoader.sh."
+      "$bin/chukwa-daemon.sh" --config $CHUKWA_CONF_DIR start systemDataLoader.sh &
+  fi
 fi
 
-# start node activity plugin
-tenmin=`echo ${min} | cut -b 2-`
-if [ "X${tenmin}" == "X0" ]; then
-  pidFile=$CHUKWA_HOME/var/run/NodeActivityPlugin.pid
-  if [ -f $pidFile ]; then
-    pid=`head ${pidFile}`
-    ChildPIDRunningStatus=`${jps} | grep ${pid} | grep NodeActivityMDL | wc -l`
-    if [ $ChildPIDRunningStatus -lt 1 ]; then
-       ${java} -DCHUKWA_HOME=${CHUKWA_HOME} -Dlog4j.configuration=nodeActivity.properties -classpath ${CLASSPATH}:${chukwa}:${ikit}:${common} org.apache.hadoop.chukwa.sources.plugin.nodeactivity.NodeActivityMDL&
-    fi
-  else
-      ${java} -DCHUKWA_HOME=${CHUKWA_HOME} -Dlog4j.configuration=nodeActivity.properties -classpath ${CLASSPATH}:${chukwa}:${ikit}:${common} org.apache.hadoop.chukwa.sources.plugin.nodeactivity.NodeActivityMDL&
+# monitor torque data loader
+pidFile=$CHUKWA_HOME/var/run/TorqueDataLoader.pid
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`${JPS} | grep ^${pid} | grep -v grep | wc -l`
+  if [ $ChildPIDRunningStatus -lt 1 ]; then
+      HOSTNAME=`hostname`
+      echo "${HOSTNAME}: pid file exists, but process missing.  Restarting torqueDataLoader.sh."
+      "$bin/chukwa-daemon.sh" --config $CHUKWA_CONF_DIR start torqueDataLoader.sh &
   fi
 fi
 
-# start database summary loader
-tenmin=`echo ${min} | cut -b 2-`
-if [ "X${tenmin}" == "X0" ]; then
-    pidFile=$CHUKWA_HOME/var/run/DBSummaryLoader.pid
-    if [ -f $pidFile ]; then
-        pid=`head ${pidFile}`
-        ChildPIDRunningStatus=`${jps} | grep ${pid} | grep DBSummaryLoader | wc -l`
-        if [ $ChildPIDRunningStatus -lt 1 ]; then
-            ${java} -Xms128m -Xmx1280m -DCHUKWA_HOME=${CHUKWA_HOME} -Dlog4j.configuration=log4j.properties -classpath ${CLASSPATH}:${chukwa}:${ikit}:${ckit}:${common} org.apache.hadoop.chukwa.extraction.DBSummaryLoader &
-        fi
-    else
-        ${java} -Xms128m -Xmx1280m -DCHUKWA_HOME=${CHUKWA_HOME} -Dlog4j.configuration=log4j.properties -classpath ${CLASSPATH}:${chukwa}:${ikit}:${ckit}:${common} org.apache.hadoop.chukwa.extraction.DBSummaryLoader &
-    fi
+# monitor dataSinkFiles.sh
+pidFile=$CHUKWA_HOME/var/run/chukwa-$CHUKWA_IDENT_STRING-processSinkFiles.sh.pid
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`ps ax | grep ${pid} | grep processSinkFiles.sh | grep -v grep | wc -l`
+  if [ $ChildPIDRunningStatus -lt 1 ]; then
+      HOSTNAME=`hostname`
+      echo "${HOSTNAME}: pid file exists, but process missing.  Restarting processSinkFiles.sh."
+      "$bin/chukwa-daemon.sh" --config $CHUKWA_CONF_DIR start processSinkFiles.sh &
+  fi
 fi
 
+# monitor dbAdmin.sh
+pidFile=$CHUKWA_HOME/var/run/chukwa-$CHUKWA_IDENT_STRING-dbAdmin.sh.pid
+if [ -f $pidFile ]; then
+  pid=`head ${pidFile}`
+  ChildPIDRunningStatus=`ps ax | grep ${pid} | grep dbAdmin.sh | grep -v grep | wc -l`
+  if [ $ChildPIDRunningStatus -lt 1 ]; then
+      HOSTNAME=`hostname`
+      echo "${HOSTNAME}: pid file exists, but process missing.  Restarting dbAdmin.sh."
+      "$bin/chukwa-daemon.sh" --config $CHUKWA_CONF_DIR start dbAdmin.sh &
+  fi
+fi

+ 40 - 27
src/contrib/chukwa/build.xml

@@ -20,9 +20,7 @@
 <project name="chukwa" default="main">
 
   <import file="../build-contrib.xml"/>
-
-
-	<property name="chukwaVersion" value="0.0.1"/>
+	<property name="chukwaVersion" value="0.1.0"/>
 
 	<property name="lib.dir" value="${basedir}/lib"/>
 	<property name="src.dir" value="${basedir}/src"/>
@@ -60,15 +58,6 @@
 		<mkdir dir="${build.dir}"/>
 		<mkdir dir="${build.classes}"/>
 		<mkdir dir="${build.dir}/test"/>
-
-		<!-- 
-    	<copy todir="${build.dir}">
-            <fileset dir="${basedir}">
-                <exclude name="**/VERSION"/>
-            </fileset>
-        </copy>
-      
--->
 		<exec executable="echo" output="${basedir}/bin/VERSION">
 			<arg line="${chukwaVersion}" />
 		</exec>
@@ -106,9 +95,6 @@
 
 
 
-	<target name="test" depends="compile,compile-test,test-chunk" description="Chukwa Tests">
-	</target>
-
 	<target name="test-chunk" depends="compile,compile-test" description="Test chunk">
                
           <junit showoutput="yes"
@@ -136,16 +122,16 @@
 		<mkdir dir="${build.dir}/collector/WEB-INF/lib"/>
 		<mkdir dir="${build.dir}/collector/META-INF"/>
 		<copy todir="${build.dir}/collector/WEB-INF/classes">
-			<fileset dir="${build.dir}">
+			<fileset dir="${build.classes}">
 				<include name="org/apache/hadoop/chukwa/conf/**/*.class" />
 			</fileset>
-			<fileset dir="${build.dir}">
+			<fileset dir="${build.classes}">
 				<include name="org/apache/hadoop/chukwa/datacollection/**/*.class" />
 			</fileset>
-			<fileset dir="${build.dir}">
+			<fileset dir="${build.classes}">
 				<include name="org/apache/hadoop/chukwa/extraction/**/*.class" />
 			</fileset>
-			<fileset dir="${build.dir}">
+			<fileset dir="${build.classes}">
 				<include name="org/apache/hadoop/chukwa/util/**/*.class" />
 			</fileset>
 			<fileset dir="${basedir}/src/java">
@@ -182,7 +168,7 @@
 		</jar>
 	</target>
 	<target name="tools_jar" depends="compile, collector" description="Create tools jar">
-		<jar jarfile="${build.dir}/tools-${chukwaVersion}.jar" basedir="${build.dir}" includes="org/apache/hadoop/chukwa/inputtools/**/*.class">
+		<jar jarfile="${build.dir}/tools-${chukwaVersion}.jar" basedir="${build.classes}" includes="org/apache/hadoop/chukwa/inputtools/**/*.class">
 			<fileset dir="${basedir}/src/java">
 				<include name="org/apache/hadoop/chukwa/inputtools/**/*.java"/>
 			</fileset>
@@ -190,7 +176,7 @@
 	</target>
 
 	<target name="agent_jar" depends="compile, collector" description="Create agent jar">
-		<jar jarfile="${build.dir}/chukwa-agent-${chukwaVersion}.jar" basedir="${build.dir}" includes="org/apache/hadoop/chukwa/client/**/*.class" >
+		<jar jarfile="${build.dir}/chukwa-agent-${chukwaVersion}.jar" basedir="${build.classes}" includes="org/apache/hadoop/chukwa/client/**/*.class" >
 			<fileset dir="${build.dir}">
 				<include name="org/apache/hadoop/chukwa/conf/**/*.class"/>
 			</fileset>
@@ -226,7 +212,7 @@
 	</target>
 
 	<target name="chukwa_jar" depends="compile, collector" description="Create chukwa-core jar">
-		<jar jarfile="${build.dir}/chukwa-core-${chukwaVersion}.jar" basedir="${build.dir}" includes="org/apache/hadoop/chukwa/datacollection/**/*.class" >
+		<jar jarfile="${build.dir}/chukwa-core-${chukwaVersion}.jar" basedir="${build.classes}" includes="org/apache/hadoop/chukwa/datacollection/**/*.class" >
                         <manifest>
                             <section name="org/apache/hadoop/chukwa">
                                 <attribute name="Implementation-Title" value="Chukwa"/>
@@ -246,7 +232,7 @@
 
 	<target name="chukwa-hadoop_jar" depends="compile" description="Create chukwa_hadoop jar for use with getting hadoop to use chukwa">
 
-		<jar jarfile="${build.dir}/chukwa-hadoop-${chukwaVersion}-client.jar" basedir="${build.dir}" includes="org/apache/hadoop/chukwa/inputtools/log4j/**/*.class">
+		<jar jarfile="${build.dir}/chukwa-hadoop-${chukwaVersion}-client.jar" basedir="${build.classes}" includes="org/apache/hadoop/chukwa/inputtools/log4j/**/*.class">
 			<fileset dir="${basedir}/src/java">
 				<include name="org/apache/hadoop/chukwa/inputtools/log4j/**/*.java"/>
 		        <include name="org/apache/hadoop/chukwa/datacollection/client/**/*.java"/>
@@ -261,7 +247,6 @@
 	</target>
 
 	<target name="compress" depends="compile,collector,collector_jar,tools_jar,agent_jar,chukwa_jar,chukwa-hadoop_jar" description="Compression target">
-		<!-- 	<jar jarfile="${build.dir}/chukwa.jar" basedir="${build.dir}" includes="org/apache/hadoop/chukwa/**/*.class" /> -->
 
 		<copy todir="." includeEmptyDirs="false">
 			<fileset dir="${build.dir}">
@@ -271,6 +256,32 @@
 		</copy>
 	</target>
 
+	<target name="test" depends="compile,compile-test,test-chunk,test-input-tools" description="Automated Test Framework">
+	</target>
+
+	<target name="test-input-tools" depends="compile,compile-test" description="Test Input Tools">
+            <junit showoutput="${test.output}"
+             printsummary="${test.junit.printsummary}"
+             haltonfailure="${test.junit.haltonfailure}"
+             fork="yes"
+             forkmode="${test.junit.fork.mode}"
+             maxmemory="${test.junit.maxmemory}"
+             dir="${basedir}" timeout="${test.timeout}"
+             errorProperty="tests.failed" failureProperty="tests.failed">
+               <sysproperty key="CHUKWA_HOME" value="${basedir}"/>
+               <sysproperty key="CHUKWA_CONF_DIR" value="${basedir}/conf"/>
+               <classpath refid="chukwaTestClasspath"/>
+               <formatter type="${test.junit.output.format}" />
+               <batchtest todir="${build.dir}/test" >
+               <fileset dir="${test.src.dir}" includes="org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/*.java"/>
+               </batchtest>
+            </junit>
+            <delete>
+               <fileset dir="${basedir}/var" includes="*"/>
+            </delete>
+            <fail if="tests.failed">Tests failed!</fail>
+	</target>
+
 	<!-- ====================================================== -->
 	<!-- Macro definitions                                      -->
 	<!-- ====================================================== -->
@@ -323,7 +334,7 @@
 		</copy>
 
 		<copy todir="${build.dir}/${final.name}/conf">
-			<fileset dir="${conf.dir}" excludes="**/*.template"/>
+			<fileset dir="${conf.dir}"/>
 		</copy>
 
 		<copy todir="${build.dir}/${final.name}/docs">
@@ -372,6 +383,8 @@
 			<param.listofitems>
 				<tarfileset dir="${build.dir}" mode="664">
 					<exclude name="${final.name}/org/*" />
+					<exclude name="${final.name}/collector/**" />
+					<exclude name="${final.name}/${final.name}/**" />
 					<include name="${final.name}/**" />
 				</tarfileset>
 				<tarfileset dir="${build.dir}" mode="755">
@@ -420,8 +433,8 @@
 	</path>
         <path id="chukwaTestClasspath">
                 <pathelement location="${hadoop.root}/build/classes"/>
-                <pathelement location="${hadoop.root}/build/contrib/chukwa/classes/"/>
-                <pathelement location="${hadoop.root}/build/contrib/chukwa/test/classes/"/>
+                <pathelement location="${build.classes}"/>
+                <pathelement location="${test.build.classes}"/>
                 <fileset dir="${lib.dir}">
                         <include name="**/*.jar" />
                         <exclude name="**/excluded/" />

+ 1 - 0
src/contrib/chukwa/conf/alert.conf.template

@@ -0,0 +1 @@
+user@example.com

+ 19 - 4
src/contrib/chukwa/conf/chukwa-env.sh

@@ -6,20 +6,35 @@
 # remote nodes.
 
 # The java implementation to use.  Required.
-export JAVA_HOME=/Library/Java/Home/
+export JAVA_HOME=/usr/lib/j2sdk1.5-sun
 
 # The location of the Hadoop the collector should use. Default 
 # assumes that this chukwa is living in hadoop's src/contrib directory
-#export HADOOP_HOME=
+export HADOOP_HOME="/usr/lib/hadoop/current"
 
 # The directory where pid files are stored. CHUKWA_HOME/var/run by default.
 #export CHUKWA_PID_DIR=
 
 # The location of chukwa logs, defaults to CHUKWA_HOME/logs
-#export CHUKWA_LOG_DIR=
+export CHUKWA_LOG_DIR=${CHUKWA_HOME}/logs
 
 # The location of a hadoop jars. use this if you are running a collector
 # without a running HDFS (i.e. which writes sequence files to local disk)
 # if this is not set, the default is to check HADOOP_HOME for jars or
 # classes, if those are not found, uses hadoop jars which come with chukwa
-#export HADOOP_JAR=
+export HADOOP_JAR=`ls ${HADOOP_HOME}/hadoop-*-core.jar`
+
+# The location of chukwa data repository
+export chuwaRecordsRepository="/chukwa/repos/"
+
+# The location of torque pbsnodes command
+export nodeActivityCmde="/usr/lib/torque/current/bin/pbsnodes "
+
+# The server which contain pbsnodes, qstat and tracejob.
+export TORQUE_SERVER=localhost
+
+# The location contain torque binaries.
+export TORQUE_HOME=/usr/lib/torque
+
+# The domain of the cluster
+#export DOMAIN=

+ 19 - 4
src/contrib/chukwa/conf/chukwa-env.sh.template

@@ -6,20 +6,35 @@
 # remote nodes.
 
 # The java implementation to use.  Required.
-export JAVA_HOME=/Library/Java/Home/
+export JAVA_HOME=/usr/lib/j2sdk1.5-sun
 
 # The location of the Hadoop the collector should use. Default 
 # assumes that this chukwa is living in hadoop's src/contrib directory
-#export HADOOP_HOME=
+export HADOOP_HOME="/usr/lib/hadoop/current"
 
 # The directory where pid files are stored. CHUKWA_HOME/var/run by default.
 #export CHUKWA_PID_DIR=
 
 # The location of chukwa logs, defaults to CHUKWA_HOME/logs
-#export CHUKWA_LOG_DIR=
+export CHUKWA_LOG_DIR=${CHUKWA_HOME}/logs
 
 # The location of a hadoop jars. use this if you are running a collector
 # without a running HDFS (i.e. which writes sequence files to local disk)
 # if this is not set, the default is to check HADOOP_HOME for jars or
 # classes, if those are not found, uses hadoop jars which come with chukwa
-#export HADOOP_JAR=
+export HADOOP_JAR=`ls ${HADOOP_HOME}/hadoop-*-core.jar`
+
+# The location of chukwa data repository
+export chuwaRecordsRepository="/chukwa/repos/"
+
+# The location of torque pbsnodes command
+export nodeActivityCmde="/usr/lib/torque/current/bin/pbsnodes "
+
+# The server which contain pbsnodes, qstat and tracejob.
+export TORQUE_SERVER=localhost
+
+# The location contain torque binaries.
+export TORQUE_HOME=/usr/lib/torque
+
+# The domain of the cluster
+#export DOMAIN=

+ 1 - 0
src/contrib/chukwa/conf/chukwa-slaves.template

@@ -0,0 +1 @@
+localhost

+ 7 - 0
src/contrib/chukwa/conf/hadoop-metrics.properties

@@ -0,0 +1,7 @@
+dfs.class=org.apache.hadoop.chukwa.inputtools.log4j.Log4JMetricsContext
+dfs.period=60
+
+jvm.class=org.apache.hadoop.chukwa.inputtools.log4j.Log4JMetricsContext
+jvm.period=60
+
+

+ 1 - 0
src/contrib/chukwa/conf/initial_adaptors.template

@@ -0,0 +1 @@
+add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped SysLog 0 /var/log/messages 0

+ 0 - 8
src/contrib/chukwa/conf/nodeActivity.properties

@@ -1,8 +0,0 @@
-log4j.rootLogger=INFO, R 
-log4j.appender.R=org.apache.log4j.RollingFileAppender
-log4j.appender.R.File=${CHUKWA_HOME}/logs/NodeActivityPlugin.log
-log4j.appender.R.MaxFileSize=10MB
-log4j.appender.R.MaxBackupIndex=10
-log4j.appender.R.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.layout.ConversionPattern=%d{ISO8601} %p %t %c{1} - %m%n
-log4j.logger.org.apache.hadoop.chukwa.ikit.DataConfig=INFO, R

+ 0 - 8
src/contrib/chukwa/conf/queueinfo.properties

@@ -1,8 +0,0 @@
-log4j.rootLogger=INFO, R 
-log4j.appender.R=org.apache.log4j.RollingFileAppender
-log4j.appender.R.File=${CHUKWA_HOME}/logs/QueueInfoDataLoader.log
-log4j.appender.R.MaxFileSize=10MB
-log4j.appender.R.MaxBackupIndex=10
-log4j.appender.R.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.layout.ConversionPattern=%d{ISO8601} %p %t %c{1} - %m%n
-log4j.logger.org.apache.hadoop.chukwa.ikit.DataConfig=INFO, R

+ 0 - 8
src/contrib/chukwa/conf/torque.properties

@@ -1,8 +0,0 @@
-log4j.rootLogger=INFO, R 
-log4j.appender.R=org.apache.log4j.RollingFileAppender
-log4j.appender.R.File=${CHUKWA_HOME}/logs/TorqueDataLoader.log
-log4j.appender.R.MaxFileSize=10MB
-log4j.appender.R.MaxBackupIndex=10
-log4j.appender.R.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.layout.ConversionPattern=%d{ISO8601} %p %t %c{1} - %m%n
-log4j.logger.org.apache.hadoop.chukwa.ikit.DataConfig=INFO, R

+ 0 - 8
src/contrib/chukwa/conf/util.properties

@@ -1,8 +0,0 @@
-log4j.rootLogger=INFO, R 
-log4j.appender.R=org.apache.log4j.RollingFileAppender
-log4j.appender.R.File=${CHUKWA_HOME}/logs/UtilDataLoader.log
-log4j.appender.R.MaxFileSize=10MB
-log4j.appender.R.MaxBackupIndex=10
-log4j.appender.R.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.layout.ConversionPattern=%d{ISO8601} %p %t %c{1} - %m%n
-log4j.logger.org.apache.hadoop.chukwa.ikit.DataConfig=INFO, R

+ 7 - 6
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/ChukwaTTInstru.java

@@ -27,27 +27,28 @@ import org.apache.hadoop.mapred.*;
 /**
  * An instrumentation plugin for Hadoop, to trigger Chukwa-based task logfile collection.
  * 
- * WARNING:  This code depends on hadoop features that have not yet been committed.
- *   To allow it to compile, the key lines have been commented out, and marked with
- *   'PENDING'.
+ * WARNING:  This code depends on hadoop features only available in 0.19.
+ * It won't do any good if you try to use it with an earlier Hadoop.
  *
  */
-public class ChukwaTTInstru 
-extends TaskTrackerMetricsInst  //PENDING on getting new metrics code into Hadoop
+public class ChukwaTTInstru extends TaskTrackerMetricsInst 
 {
 
   private Map<TaskAttemptID, Long> stdOutAdaptors;
   private Map<TaskAttemptID, Long> stdErrAdaptors;
   private ChukwaAgentController chukwa;
+//  private TaskTrackerMetricsInst parent; //for chaining together multiple
+      //instrumentation subsystems
   
   public ChukwaTTInstru(TaskTracker t) {
-    super(t);  //PENDING
+    super(t);
     stdOutAdaptors = new HashMap<TaskAttemptID, Long>();
     stdErrAdaptors = new HashMap<TaskAttemptID, Long>();
     chukwa = new ChukwaAgentController();
   }
   
   public void reportTaskLaunch(TaskAttemptID taskid, File stdout, File stderr)  {
+//    parent.reportTaskLaunch(taskid, stdout, stderr);
     long stdoutID = chukwa.addFile("unknown-userdata", stdout.getAbsolutePath());
     long stderrID = chukwa.addFile("unknown-userdata", stderr.getAbsolutePath());
     stdOutAdaptors.put(taskid, stdoutID);

+ 146 - 82
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java

@@ -9,23 +9,23 @@
 
 package org.apache.hadoop.chukwa.inputtools.log4j;
 
-import java.io.IOException;
 import java.io.File;
+import java.io.IOException;
 import java.text.SimpleDateFormat;
+import java.util.Calendar;
 import java.util.Date;
 import java.util.GregorianCalendar;
-import java.util.Calendar;
-import java.util.TimeZone;
 import java.util.Locale;
+import java.util.TimeZone;
 
+import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
+import org.apache.hadoop.chukwa.util.RecordConstants;
 import org.apache.log4j.FileAppender;
 import org.apache.log4j.Layout;
+import org.apache.log4j.Logger;
 import org.apache.log4j.helpers.LogLog;
 import org.apache.log4j.spi.LoggingEvent;
 
-import org.apache.hadoop.chukwa.util.RecordConstants;
-import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
-
 /**
     ChukwaDailyRollingFileAppender is a slightly modified version of
     DailyRollingFileAppender, with modified versions of its
@@ -129,14 +129,13 @@ import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
     <p>Do not use the colon ":" character in anywhere in the
     <b>DatePattern</b> option. The text before the colon is interpeted
     as the protocol specificaion of a URL which is probably not what
-    you want.
+    you want. 
 
+*/
 
-    @author Eirik Lygre
-    @author Ceki G&uuml;lc&uuml; */
 public class ChukwaDailyRollingFileAppender extends FileAppender {
 
-
+	static Logger log = Logger.getLogger(ChukwaDailyRollingFileAppender.class);
   // The code assumes that the following constants are in a increasing
   // sequence.
   static final int TOP_OF_TROUBLE=-1;
@@ -149,6 +148,9 @@ public class ChukwaDailyRollingFileAppender extends FileAppender {
 
   static final String adaptorType = ChukwaAgentController.CharFileTailUTF8NewLineEscaped;
 
+  static final Object lock = new Object();
+  static String lastRotation = "";
+  
   /**
     The date pattern. By default, the pattern is set to
     "'.'yyyy-MM-dd" meaning daily rollover.
@@ -180,6 +182,9 @@ public class ChukwaDailyRollingFileAppender extends FileAppender {
   int checkPeriod = TOP_OF_TROUBLE;
 
   ChukwaAgentController chukwaClient;
+  boolean chukwaClientIsNull = true;
+  static final Object chukwaLock = new Object();
+  
   String chukwaClientHostname;
   int chukwaClientPortNum;
   long chukwaClientConnectNumRetry;
@@ -203,7 +208,7 @@ public class ChukwaDailyRollingFileAppender extends FileAppender {
   /**
      Instantiate a <code>DailyRollingFileAppender</code> and open the
      file designated by <code>filename</code>. The opened filename will
-     become the ouput destination for this appender.
+     become the output destination for this appender.
 
    */
   public ChukwaDailyRollingFileAppender (Layout layout, String filename,
@@ -336,12 +341,10 @@ public class ChukwaDailyRollingFileAppender extends FileAppender {
       return;
     }
 
+	
     // close current file, and rename it to datedFilename
     this.closeFile();
 
-    if (chukwaClient != null){
-      chukwaClient.pauseFile(getRecordType(),fileName);
-    }
 
     File target  = new File(scheduledFilename);
     if (target.exists()) {
@@ -363,19 +366,44 @@ public class ChukwaDailyRollingFileAppender extends FileAppender {
     }
     catch(IOException e) {
       errorHandler.error("setFile("+fileName+", false) call failed.");
-    }
-
-    //resume the adaptor for the file now that we have emptied it (i.e. rolled it over)
-    if (chukwaClient.isFilePaused(getRecordType(), fileName)){
-      chukwaClient.resumeFile(getRecordType(), fileName);
-    }
-    else {
-      LogLog.warn("chukwa appender for file " + fileName + " was not paused, so we didn't do resumeFile() for it");
-    }
-    
+    }    
     scheduledFilename = datedFilename;
   }
 
+  
+  private class ClientFinalizer extends Thread 
+  {
+	  private ChukwaAgentController chukwaClient = null;
+	  private String recordType = null;
+	  private String fileName = null;
+	  public ClientFinalizer(ChukwaAgentController chukwaClient,String recordType, String fileName)
+	  {
+		  this.chukwaClient = chukwaClient;
+		  this.recordType = recordType;
+		  this.fileName = fileName;
+	  }
+	    public synchronized void run() 
+	    {
+	      try 
+	      {
+	    	  if (chukwaClient != null)
+	    	  {
+	    		  log.debug("ShutdownHook: removing:" + fileName);
+	    		  chukwaClient.removeFile(recordType, fileName);
+	    	  }
+	    	  else
+	    	  {
+	    		  LogLog.warn("chukwaClient is null cannot do any cleanup");
+	    	  }
+	      } 
+	      catch (Throwable e) 
+	      {
+	    	  LogLog.warn("closing the controller threw an exception:\n" + e);
+	      }
+	    }
+	  }
+	  private ClientFinalizer clientFinalizer = null;
+  
   /**
    * This method differentiates DailyRollingFileAppender from its
    * super class.
@@ -384,64 +412,96 @@ public class ChukwaDailyRollingFileAppender extends FileAppender {
    * time to do a rollover. If it is, it will schedule the next
    * rollover time and then rollover.
    * */
-  protected void subAppend(LoggingEvent event) {
-    //we set up the chukwa adaptor here because this is the first
-    //point which is called after all setters have been called with
-    //their values from the log4j.properties file, in particular we
-    //needed to give setCukwaClientPortNum() and -Hostname() a shot
-    if (chukwaClient == null){
-        if (getChukwaClientHostname() != null && getChukwaClientPortNum() != 0){
-        chukwaClient = new ChukwaAgentController(getChukwaClientHostname(), getChukwaClientPortNum());
-        System.out.println("setup adaptor with hostname " + getChukwaClientHostname() + " and portnum " + getChukwaClientPortNum());
-      }
-      else{
-        chukwaClient = new ChukwaAgentController();
-        System.out.println("setup adaptor with no args, which means it used its defaults");
-      }
-        
-      //if they haven't specified, default to retrying every 10 seconds for 5 minutes
-      long retryInterval = chukwaClientConnectRetryInterval;
-      if (retryInterval == 0)
-        retryInterval = 1000;
-      long numRetries = chukwaClientConnectNumRetry;
-      if (numRetries == 0)
-        numRetries = 30;
-      long adaptorID = chukwaClient.addFile(getRecordType(), getFile(), numRetries, retryInterval);
-      if (adaptorID > 0){
-        System.out.println("Added file tailing adaptor to chukwa agent for file " + getFile());
-      }
-      else{
-        System.out.println("Chukwa adaptor not added, addFile(" + getFile() + ") returned " + adaptorID);
-      }
-    }
-    long n = System.currentTimeMillis();
-    if (n >= nextCheck) {
-      now.setTime(n);
-      nextCheck = rc.getNextCheckMillis(now);
-      try {
-        rollOver();
-      }
-      catch(IOException ioe) {
-        LogLog.error("rollOver() failed.", ioe);
-      }
-    }
-    //escape the newlines from record bodies and then write this record to the log file
-    this.qw.write(RecordConstants.escapeAllButLastRecordSeparator("\n",this.layout.format(event)));
+  protected void subAppend(LoggingEvent event) 
+  {
+	  try
+	  {
+		  //we set up the chukwa adaptor here because this is the first
+		  //point which is called after all setters have been called with
+		  //their values from the log4j.properties file, in particular we
+		  //needed to give setCukwaClientPortNum() and -Hostname() a shot
+		  
+		  // Make sure only one thread can do this
+		  // and use the boolean to avoid the first level locking
+		  if (chukwaClientIsNull)
+		  {
+			  synchronized(chukwaLock)
+			  {
+				  if (chukwaClient == null){
+					  if (getChukwaClientHostname() != null && getChukwaClientPortNum() != 0){
+						  chukwaClient = new ChukwaAgentController(getChukwaClientHostname(), getChukwaClientPortNum());
+						  log.debug("setup adaptor with hostname " + getChukwaClientHostname() + " and portnum " + getChukwaClientPortNum());
+					  }
+					  else{
+						  chukwaClient = new ChukwaAgentController();
+						  log.debug("setup adaptor with no args, which means it used its defaults");
+					  }
+
+					  chukwaClientIsNull = false;
+					  
+					  //if they haven't specified, default to retrying every minute for 2 hours
+					  long retryInterval = chukwaClientConnectRetryInterval;
+					  if (retryInterval == 0)
+						  retryInterval = 1000 * 60;
+					  long numRetries = chukwaClientConnectNumRetry;
+					  if (numRetries == 0)
+						  numRetries = 120;
+					  String log4jFileName = getFile();
+					  String recordType = getRecordType();
+					  long adaptorID = chukwaClient.addFile(recordType, log4jFileName, numRetries, retryInterval);
+
+					  // Setup a shutdownHook for the controller
+					  clientFinalizer = new ClientFinalizer(chukwaClient,recordType,log4jFileName);
+					  Runtime.getRuntime().addShutdownHook(clientFinalizer);
+
+					  
+					  if (adaptorID > 0){
+						  log.debug("Added file tailing adaptor to chukwa agent for file " + log4jFileName + "using this recordType :" + recordType);
+					  }
+					  else{
+						  log.debug("Chukwa adaptor not added, addFile(" + log4jFileName + ") returned " + adaptorID);
+					  }
+					  
+				  }				  
+			  }
+		  }
+		  
+
+		  long n = System.currentTimeMillis();
+		  if (n >= nextCheck) {
+			  now.setTime(n);
+			  nextCheck = rc.getNextCheckMillis(now);
+			  try {
+				  rollOver();
+			  }
+			  catch(IOException ioe) {
+				  LogLog.error("rollOver() failed.", ioe);
+			  }
+		  }
+		  //escape the newlines from record bodies and then write this record to the log file
+		  this.qw.write(RecordConstants.escapeAllButLastRecordSeparator("\n",this.layout.format(event)));
+
+		  if(layout.ignoresThrowable()) {
+			  String[] s = event.getThrowableStrRep();
+			  if (s != null) {
+				  int len = s.length;
+				  for(int i = 0; i < len; i++) {
+					  this.qw.write(s[i]);
+					  this.qw.write(Layout.LINE_SEP);
+				  }
+			  }
+		  }
+
+		  if(this.immediateFlush) {
+			  this.qw.flush();
+		  }		  
+	  }
+	  catch(Throwable e)
+	  {
+		  System.err.println("Exception in ChukwaRollingAppender: " + e.getMessage());
+		  e.printStackTrace();
+	  }
     
-    if(layout.ignoresThrowable()) {
-      String[] s = event.getThrowableStrRep();
-      if (s != null) {
-        int len = s.length;
-        for(int i = 0; i < len; i++) {
-          this.qw.write(s[i]);
-          this.qw.write(Layout.LINE_SEP);
-        }
-      }
-    }
-
-    if(this.immediateFlush) {
-      this.qw.flush();
-    }
   }
 
   public String getChukwaClientHostname() {
@@ -479,7 +539,11 @@ public class ChukwaDailyRollingFileAppender extends FileAppender {
  * */
 class RollingCalendar extends GregorianCalendar {
 
-  int type = ChukwaDailyRollingFileAppender.TOP_OF_TROUBLE;
+  /**
+	 * 
+	 */
+	private static final long serialVersionUID = 2153481574198792767L;
+int type = ChukwaDailyRollingFileAppender.TOP_OF_TROUBLE;
 
   RollingCalendar() {
     super();

+ 87 - 0
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java

@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.inputtools.log4j;
+
+import java.io.*;
+
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.metrics.MetricsException;
+import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
+import org.apache.hadoop.metrics.spi.OutputRecord;
+import org.apache.log4j.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class Log4JMetricsContext extends AbstractMetricsContext {
+
+  Logger out = null; //Logger.getLogger(Log4JMetricsContext.class);
+  
+  /* Configuration attribute names */
+//  protected static final String FILE_NAME_PROPERTY = "fileName";
+  protected static final String PERIOD_PROPERTY = "period";
+
+    
+  /** Creates a new instance of FileContext */
+  public Log4JMetricsContext() {}
+     
+  public void init(String contextName, ContextFactory factory) {
+    super.init(contextName, factory);
+  /*      
+    String fileName = getAttribute(FILE_NAME_PROPERTY);
+    if (fileName != null) {
+      file = new File(fileName);
+    }
+    */
+    out = Logger.getLogger("chukwa.hadoop.metrics."+contextName);
+    String periodStr = getAttribute(PERIOD_PROPERTY);
+    if (periodStr != null) {
+      int period = 0;
+      try {
+        period = Integer.parseInt(periodStr);
+      } catch (NumberFormatException nfe) {
+      }
+      if (period <= 0) {
+        throw new MetricsException("Invalid period: " + periodStr);
+      }
+      setPeriod(period);
+    }
+  }
+  
+  @Override
+  protected void emitRecord(String contextName, String recordName, OutputRecord outRec)
+      throws IOException
+  {
+	JSONObject json = new JSONObject();
+    try {
+		json.put("contextName", contextName);
+		json.put("recordName", recordName);
+		json.put("chukwa_timestamp", System.currentTimeMillis());
+	    for (String tagName : outRec.getTagNames()) {
+            json.put(tagName, outRec.getTag(tagName));
+	    }
+	    for (String metricName : outRec.getMetricNames()) {
+	    	json.put(metricName, outRec.getMetric(metricName));
+	    }
+    } catch (JSONException e) {
+		// TODO Auto-generated catch block
+		e.printStackTrace();
+	}    
+    out.info(json.toString());
+  }
+
+}

+ 64 - 0
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/ErStreamHandler.java

@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.inputtools.mdl;
+
+import java.lang.Thread;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.BufferedReader;
+import java.lang.StringBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ErStreamHandler extends Thread{
+	InputStream inpStr;
+	String command;
+	boolean record;
+	
+    private static Log log = LogFactory.getLog(ErStreamHandler.class);	
+    
+	public ErStreamHandler(InputStream inpStr,String command,boolean record){
+		this.inpStr=inpStr;
+		this.command=command;
+		this.record=record;
+
+	}
+
+	public void run(){
+		try {
+			InputStreamReader inpStrd=new InputStreamReader(inpStr);
+			BufferedReader buffRd=new BufferedReader(inpStrd);
+			String line=null;
+			StringBuffer sb=new StringBuffer();
+			while((line=buffRd.readLine())!= null){
+                 sb.append(line);			
+			}
+			buffRd.close();
+			
+			if (record && sb.length()>0) {
+				log.error(command+" execution error:"+sb.toString());				
+			}
+			
+		}catch (Exception e){
+			log.error(command+" error:"+e.getMessage());
+		}
+	}
+	
+	
+}

+ 124 - 0
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueDataLoader.java

@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.inputtools.mdl;
+
+import java.lang.Thread;
+import java.lang.management.ManagementFactory;
+import java.io.FileOutputStream;
+import java.sql.SQLException;
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.chukwa.inputtools.mdl.TorqueInfoProcessor;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
+import org.apache.hadoop.chukwa.util.PidFile;
+
+public class TorqueDataLoader {
+	   private static Log log = LogFactory.getLog("TorqueDataLoader");
+
+	    private TorqueInfoProcessor tp=null;
+        private PidFile loader=null;
+	  
+	    
+	    public TorqueDataLoader (DataConfig mdlConfig, int interval){
+	    	log.info("in torqueDataLoader");
+	   	    tp = new TorqueInfoProcessor(mdlConfig, interval);
+	   	    loader=new PidFile("TorqueDataLoader");
+	    }
+	    
+	    	    	    	    
+	    public void run(){
+ 	        boolean first=true;
+	   	    while(true){
+	       	   try{
+	   	           tp.setup(first);
+	   	           first=false;
+	   	        }catch (Exception ex){
+	   	    	   tp.shutdown();
+	   	           
+	   	    	   if (first){
+	   	    	      log.error("setup error");
+	   	    	      ex.printStackTrace();
+	   	    	      loader.clean(); // only call before system.exit()
+	   	    	      System.exit(1);
+	   	            }
+	   	           log.error("setup fail, retry after 10 minutes");
+	   	           try {
+	                     Thread.sleep(600*1000);
+	               } catch (InterruptedException e) {
+	                // TODO Auto-generated catch block
+	                	 log.error(e.getMessage());
+	               // e.printStackTrace();
+	               }
+	   		       continue;   		 
+	   		 
+	   	       }
+	   	     
+	   	       try{
+	   		        tp.run_forever();
+	   	       }catch (SQLException ex) {
+	   		        tp.shutdown();
+	   	    	    log.error("processor died, reconnect again after 10 minutes");
+	   	    	    ex.printStackTrace();
+	   		        try {
+	                     Thread.sleep(600*1000);
+	                } catch (InterruptedException e) {
+	                     // TODO Auto-generated catch block
+	                	    log.error(e.getMessage());
+	                     // e.printStackTrace();
+	                }
+	   	       }catch (Exception ex){
+	   		        try {
+	                     Thread.sleep(16*1000);
+	                } catch (InterruptedException e) {
+	                            ;
+	                }
+	   	    	   tp.shutdown();
+	   	    	   log.error("process died...."+ex.getMessage());
+	   	    	   loader.clean();
+	   	    	   System.exit(1);
+	   	       }
+	   	       
+	   	  }//while
+	   
+	    }
+	    
+	    
+		 public static void main(String[] args) {
+			   /* if (args.length < 2 || args[0].startsWith("-h")
+			        || args[0].startsWith("--h")) {
+			      System.out.println("Usage: UtilDataLoader interval(sec)");
+			      System.exit(1);puvw-./chij
+			    }
+			    String interval = args[0];
+			    int intervalValue=Integer.parseInt(interval);
+			    */
+			   int intervalValue=60;
+
+
+	           DataConfig mdlConfig=new DataConfig();
+	           
+	           TorqueDataLoader tdl = new TorqueDataLoader(mdlConfig, intervalValue);
+	           tdl.run();
+
+		 }        
+		
+}

+ 541 - 0
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java

@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.inputtools.mdl;
+
+import java.sql.SQLException;
+import java.sql.ResultSet;
+import java.lang.Exception;
+import java.util.Calendar;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.TreeMap;
+import java.util.Iterator;
+import java.lang.StringBuffer;
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.lang.Thread;
+import java.util.Timer;
+import java.lang.ProcessBuilder;
+import java.lang.Process;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.InterruptedException;
+import java.lang.System;
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
+import org.apache.hadoop.chukwa.inputtools.mdl.TorqueTimerTask;
+import org.apache.hadoop.chukwa.inputtools.mdl.ErStreamHandler;
+import org.apache.hadoop.chukwa.util.DatabaseWriter;
+
+
+public class TorqueInfoProcessor {
+    
+	private static Log log = LogFactory.getLog(TorqueInfoProcessor.class);
+    
+    private int intervalValue=60;
+	private String torqueServer = null;
+	private String torqueBinDir= null;
+	private String domain = null;
+
+    private TreeMap <String,TreeMap<String,String>> currentHodJobs;
+    
+    
+	public TorqueInfoProcessor(DataConfig mdlConfig, int interval){
+		this.intervalValue=interval;
+		
+		torqueServer=System.getProperty("TORQUE_SERVER");
+		torqueBinDir=System.getProperty("TORQUE_HOME")+File.separator+"bin";
+		domain=System.getProperty("DOMAIN");
+	    currentHodJobs=new TreeMap<String,TreeMap<String,String>>();
+	}
+	
+	
+	
+	public void setup(boolean recover)throws Exception {
+	 }
+		 
+	 private void  getHodJobInfo() throws IOException {
+		 StringBuffer sb=new StringBuffer();
+		 sb.append(torqueBinDir).append("/qstat -a");
+	 
+		 String[] getQueueInfoCommand=new String [3];
+		 getQueueInfoCommand[0]="ssh";
+		 getQueueInfoCommand[1]=torqueServer;
+		 getQueueInfoCommand[2]=sb.toString();
+		
+		 
+         String command=getQueueInfoCommand[0]+" "+getQueueInfoCommand[1]+" "+getQueueInfoCommand[2];
+		 ProcessBuilder pb= new ProcessBuilder(getQueueInfoCommand);
+         
+		 Process p=pb.start();
+		 
+		 Timer timeout=new Timer();
+		 TorqueTimerTask torqueTimerTask=new TorqueTimerTask(p, command);
+		 timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval*1000);
+
+		 BufferedReader result = new BufferedReader (new InputStreamReader (p.getInputStream()));
+		 ErStreamHandler errorHandler=new ErStreamHandler(p.getErrorStream(),command,true);
+		 errorHandler.start();
+		 
+		 String line = null;
+		 boolean start=false;
+         TreeSet<String> jobsInTorque=new TreeSet<String>();
+		 while((line=result.readLine())!=null){
+			 if (line.startsWith("---")){				 
+				 start=true;
+				 continue;
+			 }
+
+			 if(start){
+				 String [] items=line.split("\\s+");
+				 if (items.length>=10){
+				     String hodIdLong=items[0];				     
+				     String hodId=hodIdLong.split("[.]")[0];
+				     String userId=items[1];
+				     String numOfMachine=items[5];
+				     String status=items[9];
+				     jobsInTorque.add(hodId);
+                     if (!currentHodJobs.containsKey(hodId)) {
+                         TreeMap <String,String> aJobData=new TreeMap <String,String>();
+                     
+				         aJobData.put("userId", userId);
+				         aJobData.put("numOfMachine",numOfMachine);
+				         aJobData.put("traceCheckCount","0");
+                         aJobData.put("process", "0");
+				         aJobData.put("status",status);
+				         currentHodJobs.put(hodId,aJobData);
+				     }else {
+				    	 TreeMap <String,String> aJobData= currentHodJobs.get(hodId);
+				    	 aJobData.put("status", status);
+				         currentHodJobs.put(hodId,aJobData);
+				     }//if..else
+				 }				 
+			 }
+		 }//while
+		 
+         try {
+        	 errorHandler.join();
+         }catch (InterruptedException ie){
+        	 log.error(ie.getMessage());
+         }
+		 timeout.cancel();
+		 
+		 Set<String> currentHodJobIds=currentHodJobs.keySet();
+		 Iterator<String> currentHodJobIdsIt=currentHodJobIds.iterator();
+		 TreeSet<String> finishedHodIds=new TreeSet<String>();
+		 while (currentHodJobIdsIt.hasNext()){
+			 String hodId=currentHodJobIdsIt.next();
+			 if (!jobsInTorque.contains(hodId)) {
+				TreeMap <String,String> aJobData=currentHodJobs.get(hodId);
+				String process=aJobData.get("process");
+				if (process.equals("0") || process.equals("1")) {	
+					aJobData.put("status", "C");
+				}else {
+					finishedHodIds.add(hodId);
+				}
+			 }
+		 }//while
+		 
+		 Iterator<String >finishedHodIdsIt=finishedHodIds.iterator();
+		 while (finishedHodIdsIt.hasNext()){
+			 String hodId=finishedHodIdsIt.next();
+			 currentHodJobs.remove(hodId);
+		 }
+		  		 		 	 
+	 }
+	 
+	 private boolean loadQstatData(String hodId) throws IOException, SQLException {
+		 TreeMap<String,String> aJobData=currentHodJobs.get(hodId);
+		 String userId=aJobData.get("userId");
+		 
+		 StringBuffer sb=new StringBuffer();
+		 sb.append(torqueBinDir).append("/qstat -f -1 ").append(hodId);
+		 String[] qstatCommand=new String [3];
+		 qstatCommand[0]="ssh";
+		 qstatCommand[1]=torqueServer;
+		 qstatCommand[2]=sb.toString();
+		 
+         String command=qstatCommand[0]+" "+qstatCommand[1]+" "+qstatCommand[2];
+		 ProcessBuilder pb= new ProcessBuilder(qstatCommand);         
+		 Process p=pb.start();
+		 
+		 Timer timeout=new Timer();
+		 TorqueTimerTask torqueTimerTask=new TorqueTimerTask(p, command);
+		 timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval*1000);
+
+		 BufferedReader result = new BufferedReader (new InputStreamReader (p.getInputStream()));
+		 ErStreamHandler errorHandler=new ErStreamHandler(p.getErrorStream(),command,false);
+		 errorHandler.start();
+		 String line=null;
+         String hosts=null;
+         long startTimeValue=-1;
+         long endTimeValue=Calendar.getInstance().getTimeInMillis();
+         long executeTimeValue=Calendar.getInstance().getTimeInMillis();
+         boolean qstatfinished;
+        
+		 while((line=result.readLine())!=null){
+			 if (line.indexOf("ctime")>=0){
+				 String startTime=line.split("=")[1].trim();
+				 //Tue Sep  9 23:44:29 2008
+				 SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+				 Date startTimeDate;
+				try {
+					startTimeDate = sdf.parse(startTime);
+					 startTimeValue=startTimeDate.getTime();
+				} catch (ParseException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+				 
+			 }
+			 if (line.indexOf("mtime")>=0){
+				 String endTime=line.split("=")[1].trim();
+				 SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+				 Date endTimeDate;
+				try {
+					endTimeDate = sdf.parse(endTime);
+					endTimeValue=endTimeDate.getTime();
+				} catch (ParseException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+				 
+			 }
+			 if (line.indexOf("etime")>=0){
+				 String executeTime=line.split("=")[1].trim();
+				 SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+				 Date executeTimeDate;
+				try {
+					executeTimeDate = sdf.parse(executeTime);
+					executeTimeValue=executeTimeDate.getTime();
+				} catch (ParseException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+				 
+			 }			 
+			 if (line.indexOf("exec_host")>=0){
+				 hosts=line.split("=")[1].trim();
+			 }
+		  }
+		 
+		  if (hosts!=null && startTimeValue>=0) {
+			 String [] items2=hosts.split("[+]"); 
+			 int num=0;
+		     for (int i=0;i<items2.length;i++) {
+		    	 String machinetmp=items2[i];
+		    	 if( machinetmp.length()>3){
+ 			    	 String machine=items2[i].substring(0,items2[i].length()-2);
+            	     StringBuffer data=new StringBuffer();
+            	     data.append("HodId=").append(hodId);
+            	     data.append(", Machine=").append(machine);
+            	     if(domain!=null) {
+            	    	 data.append(".").append(domain);
+            	     }
+            	     log.info(data);
+            	     num++;   
+			      }
+		     } 	 
+			 Timestamp startTimedb=new Timestamp(startTimeValue);
+			 Timestamp endTimedb=new Timestamp(endTimeValue);
+			 StringBuffer data=new StringBuffer();
+			 long timeQueued=executeTimeValue-startTimeValue;
+			 data.append("HodID=").append(hodId);
+			 data.append(", UserId=").append(userId);		
+			 data.append(", StartTime=").append(startTimedb);
+			 data.append(", TimeQueued=").append(timeQueued);
+			 data.append(", NumOfMachines=").append(num);
+			 data.append(", EndTime=").append(endTimedb);
+    	     //log.info(data);
+			 qstatfinished=true;
+			 
+	      } else{
+		   		   
+             qstatfinished=false;
+          }
+		    
+          try {
+        	 errorHandler.join();
+          }catch (InterruptedException ie){
+        	 log.error(ie.getMessage());
+          }
+          result.close();
+          timeout.cancel();
+         	   
+          return qstatfinished;
+	 }
+	
+	 
+	 private boolean loadTraceJobData(String hodId) throws IOException,SQLException{
+		 TreeMap<String,String> aJobData=currentHodJobs.get(hodId);
+		 //String queue=aJobData.get("queue");
+		 String userId=aJobData.get("userId");
+		 String process=aJobData.get("process");
+		 //String numOfMachine=aJobData.get("numOfMachine");
+		 
+		 //StringBuffer traceJobsb=new StringBuffer();
+		 StringBuffer sb=new StringBuffer();
+		 sb.append(torqueBinDir).append("/tracejob -n 10 -l -m -s ").append(hodId);
+	   	 //ProcessBuilder pb= new ProcessBuilder(getQueueInfoCommand.toString());
+		 String[] traceJobCommand=new String [3];
+		 traceJobCommand[0]="ssh";
+		 traceJobCommand[1]=torqueServer;
+		 traceJobCommand[2]=sb.toString();
+		 
+         String command=traceJobCommand[0]+" "+traceJobCommand[1]+" "+traceJobCommand[2];
+		 //System.out.println(command);
+		 ProcessBuilder pb= new ProcessBuilder(traceJobCommand);
+         
+         //String testCommand="/home/lyyang/work/chukwa/src/java/org/apache/hadoop/chukwa/ikit/sleeping";
+         //ProcessBuilder pb= new ProcessBuilder(testCommand);
+		 //pb.redirectErrorStream(false);
+
+		 Process p=pb.start();
+		 
+		 Timer timeout=new Timer();
+		 TorqueTimerTask torqueTimerTask=new TorqueTimerTask(p, command);
+		 timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval*1000);
+
+		 BufferedReader result = new BufferedReader (new InputStreamReader (p.getInputStream()));
+		 ErStreamHandler errorHandler=new ErStreamHandler(p.getErrorStream(),command,false);
+		 errorHandler.start();
+		 String line=null;
+		 /*
+		 BufferedReader error = new BufferedReader (new InputStreamReader(p.getErrorStream()));
+		 String line = null;
+		 boolean start=false;
+         TreeSet<String> jobsInTorque=new TreeSet<String>();
+         String errorLine = null;;
+         while((errorLine=error.readLine())!=null) {
+        	 //discard the error message;
+        	 ;
+         }
+         */
+         String exit_status=null;
+         String hosts=null;
+         long timeQueued=-1;
+         long startTimeValue=-1;
+         long endTimeValue=-1;
+         boolean findResult=false;
+
+        
+		 while((line=result.readLine())!=null&& ! findResult){
+			 if (line.indexOf("end")>=0 &&line.indexOf("Exit_status")>=0 && line.indexOf("qtime")>=0){
+			      TreeMap <String,String> jobData=new TreeMap <String,String>() ;
+			      String [] items=line.split("\\s+");
+			      for (int i=0;i<items.length; i++) {
+			    	 String [] items2 = items[i].split("=");
+			    	 if (items2.length>=2){
+			    		 jobData.put(items2[0], items2[1]);
+			    	 }
+
+			      }
+	              String startTime=jobData.get("ctime");
+			      startTimeValue=Long.valueOf(startTime);
+			      startTimeValue=startTimeValue-startTimeValue%(60);
+			      Timestamp startTimedb=new Timestamp(startTimeValue*1000);
+			       
+			      String queueTime=jobData.get("qtime");
+			      long queueTimeValue=Long.valueOf(queueTime);
+			      
+			      String sTime=jobData.get("start");
+			      long sTimeValue=Long.valueOf(sTime);
+			      			      
+			      timeQueued=sTimeValue-queueTimeValue;
+			      
+			      String endTime=jobData.get("end");
+			      endTimeValue=Long.valueOf(endTime);
+			      endTimeValue=endTimeValue-endTimeValue%(60);
+			      Timestamp endTimedb=new Timestamp(endTimeValue*1000);
+			      
+			      exit_status=jobData.get("Exit_status");
+			      //if (process.equals("0")){
+			    	  hosts=jobData.get("exec_host");
+			    	  String [] items2=hosts.split("[+]");
+			    	  int num=0;
+			    	  for (int i=0;i<items2.length;i++) {
+			    		  String machinetemp=items2[i];
+			    		  if (machinetemp.length()>=3){
+	            		 
+			    			  String machine=items2[i].substring(0,items2[i].length()-2);
+			    			  StringBuffer data=new StringBuffer();
+			    			  data.append("HodId=").append(hodId);
+			    			  data.append(", Machine=").append(machine);
+		            	      if(domain!=null) {
+			            	   	 data.append(".").append(domain);
+			            	  }
+			    			  log.info(data.toString());
+			    			  num++;
+			    		  }  
+			    	  }
+			      
+			    	  StringBuffer data=new StringBuffer();
+			    	  data.append("HodID=").append(hodId);
+			    	  data.append(", UserId=").append(userId);		
+			    	  data.append(", Status=").append(exit_status);
+			    	  data.append(", TimeQueued=").append(timeQueued);
+			    	  data.append(", StartTime=").append(startTimedb);
+			    	  data.append(", EndTime=").append(endTimedb);
+			    	  data.append(", NumOfMachines=").append(num);
+			          log.info(data.toString());
+//			      } else{
+//			    	  StringBuffer data=new StringBuffer();
+//			    	  data.append("HodID=").append(hodId);
+//			    	  data.append(", TimeQueued=").append(timeQueued);
+//			    	  data.append(", EndTime=").append(endTimedb);
+//			    	  data.append(", Status=").append(exit_status);
+//			    	  log.info(data.toString());
+//			      }
+				  findResult=true;
+		          log.debug(" hod info for job "+hodId+" has been loaded ");
+			 }//if
+			 
+		}//while 
+
+         try {
+        	 errorHandler.join();
+         }catch (InterruptedException ie){
+        	 log.error(ie.getMessage());
+         }
+         
+        timeout.cancel();
+        boolean tracedone=false;
+        if (!findResult){
+        	
+            String traceCheckCount=aJobData.get("traceCheckCount");
+            int traceCheckCountValue=Integer.valueOf(traceCheckCount);
+            traceCheckCountValue=traceCheckCountValue+1;           
+            aJobData.put("traceCheckCount",String.valueOf(traceCheckCountValue));
+
+            
+            log.debug("did not find tracejob info for job "+hodId+", after "+traceCheckCountValue+" times checking");
+            if (traceCheckCountValue>=2){ 
+            	tracedone= true;
+            	
+//                StringBuffer deletesb1=new StringBuffer();
+//                deletesb1.append(" Delete from ").append(hodJobTable);
+//                deletesb1.append(" where hodid='").append(hodId).append("'");
+//                String delete1=deletesb1.toString();
+//                
+////                dbWriter.execute(delete1);
+//                
+//                StringBuffer deletesb2=new StringBuffer();
+//                deletesb2.append(" Delete from  ").append(hodMachineTable);
+//                deletesb2.append(" where hodid='").append(hodId).append("'");
+//                String delete2=deletesb2.toString();
+////                dbWriter.execute(delete2);
+            }
+        }
+        boolean finished=findResult|tracedone;
+       
+	   
+        return finished;
+      
+    //  return true;   
+	 }
+	 
+		 
+		 
+	 private void process_data() throws SQLException{
+	
+		 long currentTime=System.currentTimeMillis();
+		 currentTime=currentTime-currentTime%(60*1000);
+		 Timestamp timestamp=new Timestamp(currentTime);
+		 
+		 Set<String> hodIds=currentHodJobs.keySet();
+		 
+		 Iterator<String> hodIdsIt=hodIds.iterator();
+		 while (hodIdsIt.hasNext()){
+			 String hodId=(String) hodIdsIt.next();
+			 TreeMap<String,String> aJobData=currentHodJobs.get(hodId);
+			 //String queue=aJobData.get("queue");
+			 //String numOfMachine=aJobData.get("numOfMachine");
+			 String status=aJobData.get("status");
+			 String process=aJobData.get("process");
+			 if (process.equals("0") && (status.equals("R") ||status.equals("E"))){
+			     try {
+			    	 boolean result=loadQstatData(hodId);
+			    	 if (result){
+			    		 aJobData.put("process","1");
+				    	 currentHodJobs.put(hodId, aJobData);			    		 
+			    	 }
+			     }catch (IOException ioe){
+			    	 log.error("load qsat data Error:"+ioe.getMessage());
+			    	  
+			     }
+			 }
+			 if (! process.equals("2") && status.equals("C")){
+				 try {
+			    	boolean result=loadTraceJobData(hodId);
+			    	
+			    	if (result){
+			    		aJobData.put("process","2");
+			    		currentHodJobs.put(hodId, aJobData);
+			    	}
+				 }catch (IOException ioe){
+					 log.error("loadTraceJobData Error:"+ioe.getMessage());
+				 }
+			 }//if
+			
+			 
+		 } //while
+		 
+	 }
+	 
+	 
+	 private void handle_jobData() throws SQLException{		 
+		 try{
+		     getHodJobInfo();
+		 }catch (IOException ex){
+			 log.error("getQueueInfo Error:"+ex.getMessage());
+			 return;
+		 }
+		 try{    
+	         process_data();
+		 } catch (SQLException ex){
+			 log.error("process_data Error:"+ex.getMessage());
+			 throw ex;
+		 }
+	 }
+     
+	 
+	 public void run_forever() throws SQLException{    	            
+     	  while(true){
+          	  handle_jobData();
+              try {
+                  log.debug("sleeping ...");
+                  Thread.sleep(this.intervalValue*1000);
+              } catch (InterruptedException e) {
+                  log.error(e.getMessage()); 	
+              }
+          }
+     }
+     
+	 
+	 public void shutdown(){
+     }
+   	  
+	
+
+}

+ 50 - 0
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java

@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.inputtools.mdl;
+
+import java.util.TimerTask;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TorqueTimerTask extends TimerTask{
+	private Process ps=null;
+	private String command;
+	
+	private static Log log = LogFactory.getLog(TorqueTimerTask.class);
+    //public static int timeoutInterval=300;
+    public static int timeoutInterval=180;
+    
+	public TorqueTimerTask() {
+		super();
+		// TODO Auto-generated constructor stub
+	}
+	
+	public  TorqueTimerTask(Process process,String command){
+    	super();
+    	this.ps=process;
+    	this.command=command;
+    	
+    }
+	
+	public void run() {
+		ps.destroy();
+	    log.error("torque command: "+command+" timed out");
+		
+	}
+
+}

+ 45 - 25
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java

@@ -1,47 +1,67 @@
 package org.apache.hadoop.chukwa.inputtools.plugin.metrics;
 
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Timer;
+import java.util.TimerTask;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
 import org.apache.hadoop.chukwa.inputtools.plugin.IPlugin;
+import org.apache.hadoop.chukwa.util.PidFile;
 import org.json.JSONException;
 import org.json.JSONObject;
 
-public class Exec extends ExecPlugin
-{
+public class Exec extends TimerTask {
 	private static Log log = LogFactory.getLog(Exec.class);
 	private String cmde = null;
-	
-	public Exec(String[] cmds)
-	{
+    private static PidFile pFile = null;
+    private Timer timer = null;
+    private IPlugin plugin = null;
+    
+	public Exec(String[] cmds) {
 		StringBuffer c = new StringBuffer();
 		for(String cmd : cmds) {
 			c.append(cmd);
 			c.append(" ");
 		}
 		cmde = c.toString();
+		plugin = new ExecHelper(cmds);
+	}
+	public void run() {
+		try {
+			JSONObject result = plugin.execute();
+			if (result.getInt("status") < 0) {
+				System.out.println("Error");
+				log.warn("[ChukwaError]:"+ Exec.class + ", " + result.getString("stderr"));
+				System.exit(-1);
+			} else {
+				log.info(result.get("stdout"));
+			}
+		} catch(JSONException e) {
+			log.error("Exec output unparsable:"+this.cmde);
+		}
 	}
-	
-	@Override
-	public String getCmde()
-	{
+	public String getCmde() {
 		return cmde;
 	}
-
-	public static void main(String[] args) throws JSONException
-	{
-		IPlugin plugin = new Exec(args);
-		JSONObject result = plugin.execute();		
-		if (result.getInt("status") < 0)
-		{
-			System.out.println("Error");
-			log.warn("[ChukwaError]:"+ Exec.class + ", " + result.getString("stderr"));
-			System.exit(-1);
-		}
-		else
-		{
-			log.info(result.get("stdout"));
-		}
-		System.exit(0);
+    
+	public static void main(String[] args) {
+   	    pFile=new PidFile(System.getProperty("RECORD_TYPE")+"-data-loader");
+   	    Runtime.getRuntime().addShutdownHook(pFile);
+   	    int period = 60;
+   	    try {
+			if(System.getProperty("PERIOD")!=null) {
+			    period = Integer.parseInt(System.getProperty("PERIOD"));
+			}
+        } catch(NumberFormatException ex) {
+			ex.printStackTrace();
+			System.out.println("Usage: java -DPERIOD=nn -DRECORD_TYPE=recordType Exec [cmd]");
+			System.out.println("PERIOD should be numeric format of seconds.");        	
+			System.exit(0);
+        }
+   	    Timer timer = new Timer();
+		timer.schedule(new Exec(args),0, period*1000);
 	}
 }

+ 34 - 0
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/ExecHelper.java

@@ -0,0 +1,34 @@
+package org.apache.hadoop.chukwa.inputtools.plugin.metrics;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
+import org.apache.hadoop.chukwa.inputtools.plugin.IPlugin;
+import org.apache.hadoop.chukwa.util.PidFile;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class ExecHelper extends ExecPlugin {
+	private static Log log = LogFactory.getLog(ExecHelper.class);
+	private String cmde = null;
+    private static PidFile pFile = null;
+    private Timer timer = null;
+    
+	public ExecHelper(String[] cmds) {
+		StringBuffer c = new StringBuffer();
+		for(String cmd : cmds) {
+			c.append(cmd);
+			c.append(" ");
+		}
+		cmde = c.toString();
+	}
+	
+	public String getCmde() {
+		return cmde;
+	}    
+}

+ 0 - 97
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Log4JMetricsContext.java

@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.chukwa.inputtools.plugin.metrics;
-
-import java.io.*;
-
-import org.apache.hadoop.metrics.ContextFactory;
-import org.apache.hadoop.metrics.MetricsException;
-import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
-import org.apache.hadoop.metrics.spi.OutputRecord;
-import org.apache.log4j.Logger;
-
-public class Log4JMetricsContext extends AbstractMetricsContext {
-
-  static Logger out = Logger.getLogger(Log4JMetricsContext.class);
-  
-  /* Configuration attribute names */
-//  protected static final String FILE_NAME_PROPERTY = "fileName";
-  protected static final String PERIOD_PROPERTY = "period";
-
-    
-  /** Creates a new instance of FileContext */
-  public Log4JMetricsContext() {}
-     
-  public void init(String contextName, ContextFactory factory) {
-    super.init(contextName, factory);
-  /*      
-    String fileName = getAttribute(FILE_NAME_PROPERTY);
-    if (fileName != null) {
-      file = new File(fileName);
-    }
-    */    
-    String periodStr = getAttribute(PERIOD_PROPERTY);
-    if (periodStr != null) {
-      int period = 0;
-      try {
-        period = Integer.parseInt(periodStr);
-      } catch (NumberFormatException nfe) {
-      }
-      if (period <= 0) {
-        throw new MetricsException("Invalid period: " + periodStr);
-      }
-      setPeriod(period);
-    }
-  }
-  
-  @Override
-  protected void emitRecord(String contextName, String recordName, OutputRecord outRec)
-      throws IOException
-  {
-    StringBuilder writer = new StringBuilder();
-    String separator = " ";
-    writer.append("contextName=");
-    writer.append(contextName);
-    
-    writer.append(separator);
-    writer.append("recordName=");
-    writer.append(recordName);
-    
-
-    writer.append(separator);
-    writer.append("chukwa_timestamp="+ System.currentTimeMillis());
-    writer.append(recordName);
-    
-    for (String tagName : outRec.getTagNames()) {
-      writer.append(separator);
-      writer.append(tagName);
-      writer.append("=");
-      writer.append(outRec.getTag(tagName));
-    }
-    for (String metricName : outRec.getMetricNames()) {
-      writer.append(separator);
-      writer.append(metricName);
-      writer.append("=");
-      writer.append(outRec.getMetric(metricName));
-    }
-    
-    out.info(writer.toString());
-//    out.println(writer);
-  }
-
-}

+ 78 - 0
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ClusterConfig.java

@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import java.io.*;
+import java.util.*;
+
+public class ClusterConfig {
+    public static HashMap<String, String> clusterMap = new HashMap<String, String>();
+    private String path=System.getenv("CHUKWA_CONF_DIR")+File.separator;
+    static public String getContents(File aFile) {
+        //...checks on aFile are elided
+        StringBuffer contents = new StringBuffer();
+   
+        try {
+          //use buffering, reading one line at a time
+          //FileReader always assumes default encoding is OK!
+          BufferedReader input =  new BufferedReader(new FileReader(aFile));
+          try {
+             String line = null; //not declared within while loop
+             /*
+              * readLine is a bit quirky :
+              * it returns the content of a line MINUS the newline.
+              * it returns null only for the END of the stream.
+              * it returns an empty String if two newlines appear in a row.
+              */
+             while (( line = input.readLine()) != null){
+                contents.append(line);
+                contents.append(System.getProperty("line.separator"));
+             }
+          } finally {
+             input.close();
+          }
+        }
+          catch (IOException ex){
+          ex.printStackTrace();
+        }
+
+        return contents.toString();
+    }
+
+    public ClusterConfig() {
+        File cc = new File(path+"jdbc.conf");
+        String buffer = getContents(cc);
+        String[] lines = buffer.split("\n");
+        for(String line: lines) {
+            String[] data = line.split("=",2);
+            clusterMap.put(data[0],data[1]);
+        }
+    }
+
+    public String getURL(String cluster) {
+        String url = clusterMap.get(cluster);
+        return url; 
+    }
+
+    public Iterator<String> getClusters() {
+        Set<String> keys = clusterMap.keySet();
+        Iterator<String> i = keys.iterator();
+        return i;
+    }    
+}

+ 114 - 0
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/PidFile.java

@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import java.io.IOException;
+import java.io.File;
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.nio.channels.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class PidFile extends Thread {
+	
+	String name;
+	private static Log log = LogFactory.getLog(PidFile.class);
+	private static FileLock lock = null;
+        private static FileOutputStream pidFileOutput = null;
+	
+	public PidFile(String name){
+		this.name=name;
+		try {
+		    init();
+		} catch(IOException ex) {
+			clean();
+			System.exit(-1);
+		}
+	}
+	
+	public void init() throws IOException{
+  	     String pidLong=ManagementFactory.getRuntimeMXBean().getName();
+  	     String[] items=pidLong.split("@");
+  	     String pid=items[0];
+	     String chukwaPath=System.getProperty("CHUKWA_HOME");
+	     StringBuffer pidFilesb=new StringBuffer();
+	     String pidDir = chukwaPath+File.separator+"var"+File.separator+"run";
+	     pidFilesb.append(pidDir).append(File.separator).append(name).append(".pid");
+	     try{
+	    	 File existsFile = new File(pidDir);
+	    	 if(!existsFile.exists()) {
+		    	 boolean success = (new File(pidDir)).mkdirs();
+		    	 if(!success) {
+		    		 throw(new IOException());
+		    	 }
+	    	 }
+	         File pidFile= new File(pidFilesb.toString());
+
+	         pidFileOutput= new FileOutputStream(pidFile);
+             pidFileOutput.write(pid.getBytes());
+	         pidFileOutput.flush();
+	         FileChannel channel = pidFileOutput.getChannel();
+	         PidFile.lock = channel.tryLock();
+             if(PidFile.lock!=null) {
+	             log.debug("Initlization succeeded...");
+             } else {
+                 throw(new IOException());
+             }
+	     }catch (IOException ex){
+	    	 System.out.println("Initializaiton failed: can not write pid file.");
+	    	 log.error("Initialization failed...");
+	    	 log.error(ex.getMessage());
+	    	 System.exit(-1);
+	    	 throw ex;
+	    	 
+	     }
+	   
+	}	
+	
+	public void clean(){
+        String chukwaPath=System.getenv("CHUKWA_HOME");
+        StringBuffer pidFilesb=new StringBuffer();
+        pidFilesb.append(chukwaPath).append("/var/run/").append(name).append(".pid"); 
+        String pidFileName=pidFilesb.toString();
+
+        File pidFile=new File(pidFileName);
+        if (!pidFile.exists()) {
+    	   log.error("Delete pid file, No such file or directory: "+pidFileName);
+        } else {
+           try {
+               lock.release();
+	       pidFileOutput.close();
+           } catch(IOException e) {
+               log.error("Unable to release file lock: "+pidFileName);
+           }
+        }
+
+        boolean result=pidFile.delete();
+        if (!result){
+    	   log.error("Delete pid file failed, "+pidFileName);
+        }
+	}
+
+	public void run() {
+		clean();
+	}
+}