浏览代码

HADOOP-3585. FailMon package for hardware failure monitoring and
analysis of anomalies. (Ioannis Koltsidas via dhruba)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@686181 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 17 年之前
父节点
当前提交
ad4ca72c1b
共有 32 个文件被更改,包括 3876 次插入2 次删除
  1. 3 0
      CHANGES.txt
  2. 3 2
      build.xml
  3. 97 0
      src/contrib/failmon/README
  4. 54 0
      src/contrib/failmon/bin/failmon.sh
  5. 235 0
      src/contrib/failmon/bin/scheduler.py
  6. 133 0
      src/contrib/failmon/build.xml
  7. 25 0
      src/contrib/failmon/conf/commons-logging.properties
  8. 80 0
      src/contrib/failmon/conf/failmon.properties
  9. 39 0
      src/contrib/failmon/conf/global.config
  10. 10 0
      src/contrib/failmon/conf/hosts.list
  11. 40 0
      src/contrib/failmon/conf/log4j.properties
  12. 154 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Anonymizer.java
  13. 101 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/CPUParser.java
  14. 41 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Continuous.java
  15. 458 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Environment.java
  16. 151 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/EventRecord.java
  17. 120 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Executor.java
  18. 154 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HDFSMerger.java
  19. 136 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HadoopLogParser.java
  20. 268 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/LocalStore.java
  21. 214 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/LogParser.java
  22. 43 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/MonitorJob.java
  23. 53 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Monitored.java
  24. 140 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/NICParser.java
  25. 132 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/OfflineAnonymizer.java
  26. 163 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/PersistentState.java
  27. 120 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/RunOnce.java
  28. 206 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/SMARTParser.java
  29. 112 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/SensorsParser.java
  30. 163 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/SerializedRecord.java
  31. 102 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/ShellParser.java
  32. 126 0
      src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/SystemLogParser.java

+ 3 - 0
CHANGES.txt

@@ -86,6 +86,9 @@ Trunk (unreleased changes)
     number of files/bytes copied in a particular run to support incremental
     updates and mirroring. (TszWo (Nicholas), SZE via cdouglas)
 
+    HADOOP-3585. FailMon package for hardware failure monitoring and 
+    analysis of anomalies. (Ioannis Koltsidas via dhruba)
+
   IMPROVEMENTS
 
     HADOOP-3732. Delay intialization of datanode block verification till

+ 3 - 2
build.xml

@@ -799,7 +799,8 @@
     	<packageset dir="src/contrib/streaming/src/java"/>
     	<packageset dir="src/contrib/data_join/src/java"/>
     	<packageset dir="src/contrib/index/src/java"/>
-
+	<packageset dir="src/contrib/failmon/src/java/"/> 
+	
         <link href="${javadoc.link.java}"/>
 
         <classpath >
@@ -816,7 +817,7 @@
        <group title="contrib: Streaming" packages="org.apache.hadoop.streaming*"/>
        <group title="contrib: DataJoin" packages="org.apache.hadoop.contrib.utils.join*"/>
        <group title="contrib: Index" packages="org.apache.hadoop.contrib.index*"/>
-
+       <group title="contrib: FailMon" packages="org.apache.hadoop.contrib.failmon*"/>
     </javadoc>
   </target>	
 

+ 97 - 0
src/contrib/failmon/README

@@ -0,0 +1,97 @@
+****************** FailMon Quick Start Guide ***********************
+
+This document is a guide to quickly setting up and running FailMon.
+For more information and details please see the FailMon User Manual.
+
+***** Building FailMon *****
+
+Normally, FailMon lies under <hadoop-dir>/src/contrib/failmon, where
+<hadoop-source-dir> is the Hadoop project root folder. To compile it,
+one can either run ant for the whole Hadoop project, i.e.:
+
+$ cd <hadoop-dir>
+$ ant
+
+or run ant only for FailMon:
+
+$ cd <hadoop-dir>/src/contrib/failmon
+$ ant
+
+The above will compile FailMon and place all class files under
+<hadoop-dir>/build/contrib/failmon/classes.
+
+By invoking:
+
+$ cd <hadoop-dir>/src/contrib/failmon
+$ ant tar
+
+FailMon is packaged as a standalone jar application in
+<hadoop-dir>/src/contrib/failmon/failmon.tar.gz.
+
+
+***** Deploying FailMon *****
+
+There are two ways FailMon can be deployed in a cluster:
+
+a) Within Hadoop, in which case the whole Hadoop package is uploaded
+to the cluster nodes. In that case, nothing else needs to be done on
+individual nodes.
+
+b) Independently of the Hadoop deployment, i.e., by uploading
+failmon.tar.gz to all nodes and uncompressing it. In that case, the
+bin/failmon.sh script needs to be edited; environment variable
+HADOOPDIR should point to the root directory of the Hadoop
+distribution. Also the location of the Hadoop configuration files
+should be pointed by the property 'hadoop.conf.path' in file
+conf/failmon.properties. Note that these files refer to the HDFS in
+which we want to store the FailMon data (which can potentially be
+different than the one on the cluster we are monitoring).
+
+We assume that either way FailMon is placed in the same directory on
+all nodes, which is typical for most clusters. If this is not
+feasible, one should create the same symbolic link on all nodes of the
+cluster, that points to the FailMon directory of each node.
+
+One should also edit the conf/failmon.properties file on each node to
+set his own property values. However, the default values are expected
+to serve most practical cases. Refer to the FailMon User Manual about
+the various properties and configuration parameters.
+
+
+***** Running FailMon *****
+
+In order to run FailMon using a node to do the ad-hoc scheduling of
+monitoring jobs, one needs edit the hosts.list file to specify the
+list of machine hostnames on which FailMon is to be run. Also, in file
+conf/global.config the username used to connect to the machines has to
+be specified (passwordless SSH is assumed) in property 'ssh.username'.
+In property 'failmon.dir', the path to the FailMon folder has to be
+specified as well (it is assumed to be the same on all machines in the
+cluster). Then one only needs to invoke the command:
+
+$ cd <hadoop-dir>
+$ bin/scheduler.py
+
+to start the system.
+
+
+***** Merging HDFS files *****
+
+For the purpose of merging the files created on HDFS by FailMon, the
+following command can be used:
+
+$ cd <hadoop-dir>
+$ bin/failmon.sh --mergeFiles
+
+This will concatenate all files in the HDFS folder (pointed to by the
+'hdfs.upload.dir' property in conf/failmon.properties file) into a
+single file, which will be placed in the same folder. Also the
+location of the Hadoop configuration files should be pointed by the
+property 'hadoop.conf.path' in file conf/failmon.properties. Note that
+these files refer to the HDFS in which have stored the FailMon data
+(which can potentially be different than the one on the cluster we are
+monitoring). Also, the scheduler.py script can be set up to merge the
+HDFS files when their number surpasses a configurable limit (see
+'conf/global.config' file).
+
+Please refer to the FailMon User Manual for more details.

+ 54 - 0
src/contrib/failmon/bin/failmon.sh

@@ -0,0 +1,54 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# First we need to determine whether Failmon has been distributed with
+# Hadoop, or as standalone. In the latter case failmon.jar will lie in
+# the current directory.
+
+JARNAME="failmon.jar"
+HADOOPDIR=""
+CLASSPATH=""
+
+if [ `ls -l | grep src | wc -l` == 0 ]
+then
+    # standalone binary
+    if [ -n $1 ] && [ "$1" == "--mergeFiles" ]
+    then
+	jar -ufe $JARNAME org.apache.hadoop.contrib.failmon.HDFSMerger
+        java -jar $JARNAME
+    else
+    	jar -ufe $JARNAME org.apache.hadoop.contrib.failmon.RunOnce
+	java -jar $JARNAME $*
+    fi
+else
+    # distributed with Hadoop
+    HADOOPDIR=`pwd`/../../../
+    CLASSPATH=$CLASSPATH:$HADOOPDIR/build/contrib/failmon/classes
+    CLASSPATH=$CLASSPATH:$HADOOPDIR/build/classes
+    CLASSPATH=$CLASSPATH:`ls -1 $HADOOPDIR/lib/commons-logging-api-1*.jar`
+    CLASSPATH=$CLASSPATH:`ls -1 $HADOOPDIR/lib/commons-logging-1*.jar`
+    CLASSPATH=$CLASSPATH:`ls -1 $HADOOPDIR/lib/log4j-*.jar`
+#    echo $CLASSPATH
+    if [ -n $1 ] && [ "$1" == "--mergeFiles" ]
+    then
+        java -cp $CLASSPATH org.apache.hadoop.contrib.failmon.HDFSMerger
+    else
+        java -cp $CLASSPATH org.apache.hadoop.contrib.failmon.RunOnce $*
+    fi
+fi
+

+ 235 - 0
src/contrib/failmon/bin/scheduler.py

@@ -0,0 +1,235 @@
+#!/usr/bin/python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Schedule FailMon execution for nodes of file hosts.list, according to
+# the properties file conf/global.config.
+
+import time
+import ConfigParser
+import subprocess
+import threading
+import random
+
+jobs = []
+username = "user"
+connections = 10
+failmonDir = ""
+maxFiles = 100
+
+# This class represents a thread that connects to a set of cluster
+# nodes to locally execute monitoring jobs. These jobs are specified
+# as a shell command in the constructor.
+class sshThread (threading.Thread):
+
+    def __init__(self, threadname, username, command, failmonDir):
+        threading.Thread.__init__(self)
+        self.name = threadname
+        self.username = username
+        self.command = command
+        self.failmonDir = failmonDir
+        self.hosts = []
+
+    def addHost(self, host):
+        self.hosts.append(host)
+        
+    def run (self):
+        for host in self.hosts:
+            toRun = ["ssh", self.username + "@" + host, "cd " + self.failmonDir + " ; " + self.command]
+            print "Thread", self.name, "invoking command on", host, ":\t", toRun, "...",
+            subprocess.check_call(toRun)
+            print "Done!"
+
+# This class represents a monitoring job. The param member is a string
+# that can be passed in the '--only' list of jobs given to the Java
+# class org.apache.hadoop.contrib.failmon.RunOnce for execution on a
+# node.
+class Job:
+    def __init__(self, param, interval):
+        self.param = param
+        self.interval = interval
+        self.counter = interval
+        return
+
+    def reset(self):
+        self.counter = self.interval
+
+# This function reads the configuration file to get the values of the
+# configuration parameters.
+def getJobs(file):
+    global username
+    global connections
+    global jobs
+    global failmonDir
+    global maxFiles
+    
+    conf = ConfigParser.SafeConfigParser()
+    conf.read(file)
+
+    username = conf.get("Default", "ssh.username")
+    connections = int(conf.get("Default", "max.connections"))
+    failmonDir = conf.get("Default", "failmon.dir")
+    maxFiles = conf.get("Default", "hdfs.files.max")
+    
+    # Hadoop Log
+    interval = int(conf.get("Default", "log.hadoop.interval"))
+
+    if interval != 0:
+        jobs.append(Job("hadoopLog", interval))
+
+    # System Log
+    interval = int(conf.get("Default", "log.system.interval"))
+
+    if interval != 0:
+        jobs.append(Job("systemLog", interval))
+
+    # NICs
+    interval = int(conf.get("Default", "nics.interval"))
+
+    if interval != 0:
+        jobs.append(Job("nics", interval))
+
+    # CPU
+    interval = int(conf.get("Default", "cpu.interval"))
+
+    if interval != 0:
+        jobs.append(Job("cpu", interval))
+
+    # CPU
+    interval = int(conf.get("Default", "disks.interval"))
+
+    if interval != 0:
+        jobs.append(Job("disks", interval))
+
+    # sensors
+    interval = int(conf.get("Default", "sensors.interval"))
+
+    if interval != 0:
+        jobs.append(Job("sensors", interval))
+
+    # upload
+    interval = int(conf.get("Default", "upload.interval"))
+
+    if interval != 0:
+        jobs.append(Job("upload", interval))
+
+    return
+
+
+# Compute the gcd (Greatest Common Divisor) of two integerss
+def GCD(a, b):
+    assert isinstance(a, int)
+    assert isinstance(b, int)
+
+    while a:
+        a, b = b%a, a
+
+    return b
+
+# Compute the gcd (Greatest Common Divisor) of a list of integers
+def listGCD(joblist):
+    assert isinstance(joblist, list)
+
+    if (len(joblist) == 1):
+        return joblist[0].interval
+
+    g = GCD(joblist[0].interval, joblist[1].interval)
+
+    for i in range (2, len(joblist)):
+        g = GCD(g, joblist[i].interval)
+        
+    return g
+
+# Merge all failmon files created on the HDFS into a single file
+def mergeFiles():
+    global username
+    global failmonDir
+    hostList = []
+    hosts = open('./conf/hosts.list', 'r')
+    for host in hosts:
+        hostList.append(host.strip().rstrip())
+    randomHost = random.sample(hostList, 1)
+    mergeCommand = "bin/failmon.sh --mergeFiles"
+    toRun = ["ssh", username + "@" + randomHost[0], "cd " + failmonDir + " ; " + mergeCommand]
+    print "Invoking command on", randomHost, ":\t", mergeCommand, "...",
+    subprocess.check_call(toRun)
+    print "Done!"
+    return
+
+# The actual scheduling is done here
+def main():
+    getJobs("./conf/global.config")
+
+    for job in jobs:
+        print "Configuration: ", job.param, "every", job.interval, "seconds"
+        
+    globalInterval = listGCD(jobs)
+        
+    while True :
+        time.sleep(globalInterval)
+        params = []
+        
+        for job in jobs:
+            job.counter -= globalInterval
+            
+            if (job.counter <= 0):
+                params.append(job.param)
+                job.reset()
+                
+        if (len(params) == 0):
+            continue;
+                    
+        onlyStr = "--only " + params[0]
+        for i in range(1, len(params)):
+            onlyStr += ',' + params[i] 
+                
+        command = "bin/failmon.sh " + onlyStr
+
+        # execute on all nodes
+        hosts = open('./conf/hosts.list', 'r')
+        threadList = []
+        # create a thread for every connection
+        for i in range(0, connections):
+            threadList.append(sshThread(i, username, command, failmonDir))
+
+        # assign some hosts/connections hosts to every thread
+        cur = 0;
+        for host in hosts:
+            threadList[cur].addHost(host.strip().rstrip())
+            cur += 1
+            if (cur == len(threadList)):
+                cur = 0    
+
+        for ready in threadList:
+            ready.start()
+
+        for ssht in threading.enumerate():
+            if ssht != threading.currentThread():
+                ssht.join()
+
+        # if an upload has been done, then maybe we need to merge the
+        # HDFS files
+        if "upload" in params:
+            mergeFiles()
+
+    return
+
+
+if __name__ == '__main__':
+    main()
+

+ 133 - 0
src/contrib/failmon/build.xml

@@ -0,0 +1,133 @@
+<?xml version="1.0"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project name="failmon" default="compile">
+
+  <import file="../build-contrib.xml"/>
+
+  <property name="jarfile" value="${build.dir}/${name}.jar"/>
+
+  <target name="jar" depends="compile" unless="skip.contrib">
+    <!-- Make sure that the hadoop jar has been created -->
+<!-- This works, but causes findbugs to fail
+    <subant antfile="build.xml" target="jar">
+      <fileset dir="../../.." includes="build.xml"/>
+    </subant>
+-->
+    <!-- Copy the required files so that the jar can run independently
+	 of Hadoop source code -->
+    
+    <mkdir dir="lib"/>
+  
+    <copy todir="lib">
+      <fileset dir="${hadoop.root}/lib/"
+	       includes="commons-logging-*, log4j*"/>
+    </copy>
+  
+    <copy todir="lib">
+      <fileset dir="${hadoop.root}/build/"
+	       includes="hadoop-*"/>
+    </copy>
+  
+  <!-- create the list of files to add to the classpath -->
+  <fileset dir="${basedir}" id="class.path">
+    <include name="lib/commons-logging*.jar"/>
+    <include name="lib/log4j*.jar"/>
+    <include name="lib/hadoop-*.jar"/>
+  </fileset>
+  
+  <pathconvert pathsep=" " property="failmon-class-path" refid="class.path">
+    <map from="${basedir}/" to=""/>
+  </pathconvert>
+
+    <echo message="contrib: ${name}"/>
+    <jar jarfile="${jarfile}" basedir="${build.classes}">
+      <manifest>
+        <attribute name="Main-Class" value="org.apache.hadoop.contrib.failmon.RunOnce"/>
+	<attribute name="Class-Path" value="${failmon-class-path}"/> 
+      </manifest>
+    </jar>
+
+  </target>
+
+  
+  <!-- Override test target to copy sample data -->
+  <target name="test" depends="compile-test, compile, compile-examples" if="test.available">
+    <echo message="contrib: ${name}"/>
+    <delete dir="${hadoop.log.dir}"/>
+    <mkdir dir="${hadoop.log.dir}"/>
+    <delete dir="${build.test}/sample"/>
+    <mkdir dir="${build.test}/sample"/>
+    <copy todir="${build.test}/sample">
+      <fileset dir="${root}/sample"/>
+    </copy>
+    <junit
+      printsummary="yes" showoutput="${test.output}" 
+      haltonfailure="no" fork="yes" maxmemory="256m"
+      errorProperty="tests.failed" failureProperty="tests.failed"
+      timeout="${test.timeout}">
+      
+      <sysproperty key="test.build.data" value="${build.test}/data"/>
+      <sysproperty key="build.test" value="${build.test}"/>
+      <sysproperty key="contrib.name" value="${name}"/>
+      
+      <!-- requires fork=yes for: 
+        relative File paths to use the specified user.dir 
+        classpath to use build/contrib/*.jar
+      -->
+      <sysproperty key="user.dir" value="${build.test}/data"/>
+      
+      <sysproperty key="fs.default.name" value="${fs.default.name}"/>
+      <sysproperty key="hadoop.test.localoutputfile" value="${hadoop.test.localoutputfile}"/>
+      <sysproperty key="hadoop.log.dir" value="${hadoop.log.dir}"/>
+      <classpath refid="test.classpath"/>
+      <formatter type="${test.junit.output.format}" />
+      <batchtest todir="${build.test}" unless="testcase">
+        <fileset dir="${src.test}"
+                 includes="**/Test*.java" excludes="**/${test.exclude}.java" />
+      </batchtest>
+      <batchtest todir="${build.test}" if="testcase">
+        <fileset dir="${src.test}" includes="**/${testcase}.java"/>
+      </batchtest>
+    </junit>
+    <fail if="tests.failed">Tests failed!</fail>
+
+  </target>
+  
+  <target name="tar" depends="jar">
+
+    <copy todir=".">
+      <fileset dir="${hadoop.root}/build/contrib/failmon/"
+	       includes="failmon.jar"/>
+    </copy>
+    
+    <tar tarfile="${name}.tar" 
+	 basedir=".." 
+	 includes="${name}/**"
+	 excludes="${name}/${name}.tar.gz, ${name}/src/**, ${name}/logs/**, ${name}/build.xml*"/>
+    <gzip zipfile="${name}.tar.gz" src="${name}.tar"/>
+    <delete file="${name}.tar"/>
+    <delete file="${name}.jar"/>
+
+    <move file="${name}.tar.gz" todir="${build.dir}"/>
+    <echo message= "${hadoop.root}/build/contrib/failmon/${name}.jar"/>
+    
+  </target>
+  
+</project>

+ 25 - 0
src/contrib/failmon/conf/commons-logging.properties

@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#Logging Implementation
+
+#Log4J
+org.apache.commons.logging.Log=org.apache.commons.logging.impl.Log4JLogger
+
+#JDK Logger
+#org.apache.commons.logging.Log=org.apache.commons.logging.impl.Jdk14Logger

+ 80 - 0
src/contrib/failmon/conf/failmon.properties

@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# HDFS properties
+hdfs.upload.dir = /failmon
+hadoop.conf.path = ../../../conf
+
+# Hadoop Log file properties
+log.hadoop.enabled = true
+log.hadoop.filenames = /home/hadoop/hadoop-0.17.0/logs/
+# set to non-zero only for continous mode:
+log.hadoop.interval = 0
+log.hadoop.dateformat = \\d{4}-\\d{2}-\\d{2}
+log.hadoop.timeformat = \\d{2}:\\d{2}:\\d{2}
+
+# System Log file properties
+log.system.enabled = true
+log.system.filenames = /var/log/messages
+# set to non-zero only for continous mode:
+log.system.interval = 0
+log.system.dateformat = (Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\\s+(\\d+)
+log.system.timeformat = \\d{2}:\\d{2}:\\d{2}
+
+# Network Interfaces
+nic.enabled = true
+nic.list = eth0, eth1
+# set to non-zero only for continous mode:
+nic.interval = 0
+
+# CPUs & Motherboard
+cpu.enabled = true
+# set to non-zero only for continous mode:
+cpu.interval = 0
+
+# Disk devices. For all devices listed under disks.list, the corresponding
+# property disk./dev/xxx.source specifies where the output of 
+# "sudo smartctl --all /dev/xxx" can be read by a user. If this property is
+# missing, super-user privileges are assumed and the smartctl command will be 
+# invoked itself.
+
+disks.enabled = true
+disks.list = /dev/sda, /dev/sdb, /dev/sdc, /dev/sdd, /dev/hda, /dev/hdb, /dev/hdc, /dev/hdd
+#disks./dev/sda.source = hda.smart
+# set to non-zero only for continous mode:
+disks.interval = 0
+
+# lm-sensors polling
+sensors.enabled = true
+# set to non-zero only for continous mode:
+sensors.interval = 0
+
+# Executor thread properties	
+executor.interval.min = 1	
+
+# Anonymization properties
+anonymizer.hash.hostnames = false
+anonymizer.hash.ips = false
+anonymizer.hash.filenames = false
+anonymizer.hostname.suffix = apache.org
+
+# Local files options
+local.tmp.filename = failmon.dat
+local.tmp.compression = false
+# set to non-zero only for continous mode:
+local.upload.interval = 0

+ 39 - 0
src/contrib/failmon/conf/global.config

@@ -0,0 +1,39 @@
+[Default]
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# general settings
+
+# the username to use to connect to cluster nodes
+ssh.username = user
+# the maximum number of SSH connections to keep open at any time
+max.connections = 2
+# the directory in which FailMon lies 
+failmon.dir = /home/user/hadoop-core-trunk/src/contrib/failmon
+# the maximum number of HDFS files to allow FailMon to create. After
+# this limit is surpassed, all HDFS files will be concatenated into
+# one file.
+hdfs.files.max = 100
+
+# iteration intervals
+log.hadoop.interval = 0
+log.system.interval = 0
+nics.interval = 10
+cpu.interval = 10
+disks.interval = 0
+sensors.interval = 0
+upload.interval = 20

+ 10 - 0
src/contrib/failmon/conf/hosts.list

@@ -0,0 +1,10 @@
+host00
+host01
+host02
+host03
+host04
+host05
+host06
+host07
+host08
+host09

+ 40 - 0
src/contrib/failmon/conf/log4j.properties

@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Define some default values that can be overridden by system properties
+failmon.log.dir=logs
+failmon.log.file=failmon.log
+
+log4j.rootLogger= INFO, simpleFile, console
+
+# Logging Threshold
+log4j.threshhold=ALL
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+log4j.appender.simpleFile=org.apache.log4j.FileAppender
+log4j.appender.simpleFile.layout=org.apache.log4j.PatternLayout
+log4j.appender.simpleFile.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+log4j.appender.simpleFile.file= ${failmon.log.dir}/${failmon.log.file}

+ 154 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Anonymizer.java

@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+/**********************************************************
+ * This class provides anonymization to SerializedRecord objects. It 
+ * anonymizes all hostnames, ip addresses and file names/paths
+ * that appear in EventRecords gathered from the logs
+ * and other system utilities. Such values are hashed using a
+ * cryptographically safe one-way-hash algorithm (MD5).
+ * 
+ **********************************************************/
+
+public class Anonymizer {
+
+  /**
+	 * Anonymize hostnames, ip addresses and file names/paths
+   * that appear in fields of a SerializedRecord.
+   * 
+	 * @param sr the input SerializedRecord
+	 * 
+	 * @return the anonymized SerializedRecord
+	 */  	
+  public static SerializedRecord anonymize(SerializedRecord sr)
+      throws Exception {
+
+    String hostname = sr.get("hostname");
+
+    if (hostname == null)
+      throw new Exception("Malformed SerializedRecord: no hostname found");
+
+    if ("true".equalsIgnoreCase(Environment
+        .getProperty("anonymizer.hash.hostnames"))) {
+      // hash the node's hostname
+      anonymizeField(sr, "message", hostname, "_hn_");
+      anonymizeField(sr, "hostname", hostname, "_hn_");
+      // hash all other hostnames
+      String suffix = Environment.getProperty("anonymizer.hostname.suffix");
+      if (suffix != null)
+        anonymizeField(sr, "message", "(\\S+\\.)*" + suffix, "_hn_");
+    }
+
+    if ("true".equalsIgnoreCase(Environment.getProperty("anonymizer.hash.ips"))) {
+      // hash all ip addresses
+      String ipPattern = "(\\d{1,3}\\.){3}\\d{1,3}";
+      anonymizeField(sr, "message", ipPattern, "_ip_");
+      anonymizeField(sr, "ips", ipPattern, "_ip_");
+      // if multiple ips are present for a node:
+      int i = 0;
+      while (sr.get("ips" + "#" + i) != null)
+        anonymizeField(sr, "ips" + "#" + i++, ipPattern, "_ip_");
+
+      if ("NIC".equalsIgnoreCase(sr.get("type")))
+        anonymizeField(sr, "ipAddress", ipPattern, "_ip_");
+    }
+
+    if ("true".equalsIgnoreCase(Environment
+        .getProperty("anonymizer.hash.filenames"))) {
+      // hash every filename present in messages
+      anonymizeField(sr, "message", "\\s+/(\\S+/)*[^:\\s]*", " _fn_");
+      anonymizeField(sr, "message", "\\s+hdfs://(\\S+/)*[^:\\s]*",
+          " hdfs://_fn_");
+    }
+
+    return sr;
+  }
+
+  /**
+   * Anonymize hostnames, ip addresses and file names/paths
+   * that appear in fields of an EventRecord, after it gets
+   * serialized into a SerializedRecord.
+   * 
+   * @param er the input EventRecord
+   * 
+   * @return the anonymized SerializedRecord
+   */   
+  public static SerializedRecord anonymize(EventRecord er) throws Exception {
+    return anonymize(new SerializedRecord(er));
+  }
+
+  
+  private static String anonymizeField(SerializedRecord sr, String fieldName,
+      String pattern, String prefix) {
+    String txt = sr.get(fieldName);
+
+    if (txt == null)
+      return null;
+    else {
+      String anon = getMD5Hash(pattern);
+      sr.set(fieldName, txt.replaceAll(pattern, (prefix == null ? "" : prefix)
+          + anon));
+      return txt;
+    }
+  }
+
+  /**
+   * Create the MD5 digest of an input text.
+   * 
+   * @param text the input text
+   * 
+   * @return the hexadecimal representation of the MD5 digest
+   */   
+  public static String getMD5Hash(String text) {
+    MessageDigest md;
+    byte[] md5hash = new byte[32];
+    try {
+      md = MessageDigest.getInstance("MD5");
+      md.update(text.getBytes("iso-8859-1"), 0, text.length());
+      md5hash = md.digest();
+    } catch (NoSuchAlgorithmException e) {
+      e.printStackTrace();
+    } catch (UnsupportedEncodingException e) {
+      e.printStackTrace();
+    }
+    return convertToHex(md5hash);
+  }
+
+  private static String convertToHex(byte[] data) {
+    StringBuffer buf = new StringBuffer();
+    for (int i = 0; i < data.length; i++) {
+      int halfbyte = (data[i] >>> 4) & 0x0F;
+      int two_halfs = 0;
+      do {
+        if ((0 <= halfbyte) && (halfbyte <= 9))
+          buf.append((char) ('0' + halfbyte));
+        else
+          buf.append((char) ('a' + (halfbyte - 10)));
+        halfbyte = data[i] & 0x0F;
+      } while (two_halfs++ < 1);
+    }
+    return buf.toString();
+  }
+
+}

+ 101 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/CPUParser.java

@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.net.InetAddress;
+import java.util.Calendar;
+
+/**********************************************************
+ * Objects of this class parse the /proc/cpuinfo file to 
+ * gather information about present processors in the system.
+ *
+ **********************************************************/
+
+
+public class CPUParser extends ShellParser {
+
+ /**
+  * Constructs a CPUParser
+  */
+  public CPUParser() {
+    super();
+  }
+
+  /**
+   * Reads and parses /proc/cpuinfo and creates an appropriate 
+   * EventRecord that holds the desirable information.
+   * 
+   * @param s unused parameter
+   * 
+   * @return the EventRecord created
+   */
+  public EventRecord query(String s) throws Exception {
+    StringBuffer sb = Environment.runCommand("cat /proc/cpuinfo");
+    EventRecord retval = new EventRecord(InetAddress.getLocalHost()
+        .getCanonicalHostName(), InetAddress.getAllByName(InetAddress.getLocalHost()
+        .getHostName()), Calendar.getInstance(), "CPU", "Unknown", "CPU", "-");
+
+    retval.set("processors", findAll("\\s*processor\\s*:\\s*(\\d+)", sb
+        .toString(), 1, ", "));
+
+    retval.set("model name", findPattern("\\s*model name\\s*:\\s*(.+)", sb
+        .toString(), 1));
+
+    retval.set("frequency", findAll("\\s*cpu\\s*MHz\\s*:\\s*(\\d+)", sb
+        .toString(), 1, ", "));
+
+    retval.set("physical id", findAll("\\s*physical\\s*id\\s*:\\s*(\\d+)", sb
+        .toString(), 1, ", "));
+
+    retval.set("core id", findAll("\\s*core\\s*id\\s*:\\s*(\\d+)", sb
+        .toString(), 1, ", "));
+
+    return retval;
+  }
+
+  /**
+   * Invokes query() to do the parsing and handles parsing errors. 
+   * 
+   * @return an array of EventRecords that holds one element that represents
+   * the current state of /proc/cpuinfo
+   */
+  
+  public EventRecord[] monitor() {
+
+    EventRecord[] recs = new EventRecord[1];
+
+    try {
+      recs[0] = query(null);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    return recs;
+  }
+  
+  /**
+   * Return a String with information about this class
+   * 
+   * @return A String describing this class
+   */
+  public String getInfo() {
+    return ("CPU Info parser");
+  }
+
+}

+ 41 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Continuous.java

@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.IOException;
+
+/**********************************************************
+ * This class runs FailMon in a continuous mode on the local
+ * node.
+ * 
+ **********************************************************/
+
+public class Continuous {
+
+  public static void main(String[] args) {
+
+
+    Environment.prepare("failmon.properties");
+
+    Executor ex = new Executor(null);
+    new Thread(ex).start();
+
+  }
+
+}

+ 458 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Environment.java

@@ -0,0 +1,458 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.*;
+import org.apache.log4j.PropertyConfigurator;
+
+/**********************************************************
+ * This class provides various methods for interaction with
+ * the configuration and the operating system environment. Also
+ * provides some helper methods for use by other classes in
+ * the package.
+ **********************************************************/
+
+public class Environment {
+
+  public static final int DEFAULT_LOG_INTERVAL = 3600;
+
+  public static final int DEFAULT_POLL_INTERVAL = 360;
+
+  public static int MIN_INTERVAL = 5;
+
+  public static final int MAX_OUTPUT_LENGTH = 51200;
+
+  public static Log LOG;
+  
+  static Properties fmProperties = new Properties();
+
+  static boolean superuser = false;
+
+  static boolean ready = false;
+
+  /**
+   * Initializes structures needed by other methods. Also determines
+   * whether the executing user has superuser privileges. 
+   *  
+   */
+  public static void prepare(String fname) {
+
+    if (!"Linux".equalsIgnoreCase(System.getProperty("os.name"))) {
+      System.err.println("Linux system required for FailMon. Exiting...");
+      System.exit(0);
+    }
+
+    System.setProperty("log4j.configuration", "conf/log4j.properties");
+    PropertyConfigurator.configure("conf/log4j.properties");
+    LOG = LogFactory.getLog("org.apache.hadoop.contrib.failmon");
+    logInfo("********** FailMon started ***********");
+
+    // read parseState file
+    PersistentState.readState("conf/parsing.state");
+    
+    try {
+      FileInputStream propFile = new FileInputStream(fname);
+      fmProperties.load(propFile);
+      propFile.close();
+    } catch (FileNotFoundException e1) {
+      e1.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    ready = true;
+
+    try {
+      String sudo_prompt = "passwd_needed:";
+      String echo_txt = "access_ok";
+      
+      Process p = Runtime.getRuntime().exec("sudo -S -p " + sudo_prompt + " echo " + echo_txt );
+      InputStream inps = p.getInputStream();
+      InputStream errs = p.getErrorStream();
+      
+      while (inps.available() < echo_txt.length() && errs.available() < sudo_prompt.length())
+	Thread.sleep(100);
+
+      byte [] buf;
+      String s;
+      
+      if (inps.available() >= echo_txt.length()) {
+        buf = new byte[inps.available()];
+        inps.read(buf);
+        s = new String(buf);
+        if (s.startsWith(echo_txt)) {
+          superuser = true;
+	  logInfo("Superuser privileges found!");
+	} else {
+	  // no need to read errs
+	  superuser = false;
+	  logInfo("Superuser privileges not found.");
+	}
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Fetches the value of a property from the configuration file.
+   * 
+   *  @param key the name of the property
+   *  
+   *  @return the value of the property, if it exists and
+   *  null otherwise
+   */
+  public static String getProperty(String key) {
+    if (!ready)
+      prepare("conf/failmon.properties");
+    return fmProperties.getProperty(key);
+  }
+
+  /**
+   * Sets the value of a property inthe configuration file.
+   * 
+   *  @param key the name of the property
+   *  @param value the new value for the property
+   *  
+   */
+  
+  public static void setProperty(String key, String value) {
+    fmProperties.setProperty(key, value);
+  }
+
+  /**
+   * Scans the configuration file to determine which monitoring
+   * utilities are available in the system. For each one of them, a
+   * job is created. All such jobs are scheduled and executed by
+   * Executor.
+   * 
+   * @return an ArrayList that contains jobs to be executed by theExecutor. 
+   */
+  public static ArrayList<MonitorJob> getJobs() {
+
+    ArrayList<MonitorJob> monitors = new ArrayList<MonitorJob>();
+    int timeInt = 0;
+
+    // for Hadoop Log parsing
+    String [] fnames_r = getProperty("log.hadoop.filenames").split(",\\s*");
+    String tmp = getProperty("log.hadoop.enabled");
+
+    String [] fnames = expandDirs(fnames_r, ".*(.log).*");
+
+    timeInt = setValue("log.hadoop.interval", DEFAULT_LOG_INTERVAL);
+    
+    if ("true".equalsIgnoreCase(tmp) && fnames[0] != null)
+      for (String fname : fnames) {
+        File f = new File(fname);
+        if (f.exists() && f.canRead()) {
+          monitors.add(new MonitorJob(new HadoopLogParser(fname), "hadoopLog", timeInt));
+	  logInfo("Created Monitor for Hadoop log file: " + f.getAbsolutePath());
+	} else if (!f.exists())
+	  logInfo("Skipping Hadoop log file " + fname + " (file not found)");
+	else
+	  logInfo("Skipping Hadoop log file " + fname + " (permission denied)");
+    }
+    
+    
+    // for System Log parsing
+    fnames_r = getProperty("log.system.filenames").split(",\\s*");
+    tmp = getProperty("log.system.enabled");
+
+    fnames = expandDirs(fnames_r, ".*(messages).*");
+
+    timeInt = setValue("log.system.interval", DEFAULT_LOG_INTERVAL);
+    
+    if ("true".equalsIgnoreCase(tmp))
+      for (String fname : fnames) {
+        File f = new File(fname);
+        if (f.exists() && f.canRead()) {
+          monitors.add(new MonitorJob(new SystemLogParser(fname), "systemLog", timeInt));
+	  logInfo("Created Monitor for System log file: " + f.getAbsolutePath());
+        } else if (!f.exists())
+	  logInfo("Skipping system log file " + fname + " (file not found)");
+	else
+	  logInfo("Skipping system log file " + fname + " (permission denied)");
+      }
+        
+
+    // for network interfaces
+    tmp = getProperty("nic.enabled");
+
+    timeInt = setValue("nics.interval", DEFAULT_POLL_INTERVAL);
+    
+    if ("true".equalsIgnoreCase(tmp)) {
+      monitors.add(new MonitorJob(new NICParser(), "nics", timeInt));
+      logInfo("Created Monitor for NICs");
+    }
+
+    // for cpu
+    tmp = getProperty("cpu.enabled");
+
+    timeInt = setValue("cpu.interval", DEFAULT_POLL_INTERVAL);
+    
+    if ("true".equalsIgnoreCase(tmp)) {
+      monitors.add(new MonitorJob(new CPUParser(), "cpu", timeInt));
+      logInfo("Created Monitor for CPUs");
+    }
+
+    // for disks
+    tmp = getProperty("disks.enabled");
+
+    timeInt = setValue("disks.interval", DEFAULT_POLL_INTERVAL);
+    
+    if ("true".equalsIgnoreCase(tmp)) {
+      // check privileges if a disk with no disks./dev/xxx/.source is found
+      boolean smart_present = checkExistence("smartctl");
+      int disks_ok = 0;
+      String devicesStr = getProperty("disks.list");
+      String[] devices = null;
+
+      if (devicesStr != null)
+        devices = devicesStr.split(",\\s*");
+      
+      for (int i = 0; i< devices.length; i++) {
+        boolean file_present = false;
+        boolean disk_present = false;
+        
+        String fileloc = getProperty("disks." + devices[i] + ".source");
+        if (fileloc != null && fileloc.equalsIgnoreCase("true"))
+          file_present = true;
+        
+        if (!file_present) 
+          if (superuser) {
+              StringBuffer sb = runCommand("sudo smartctl -i " + devices[i]);
+              String patternStr = "[(failed)(device not supported)]";
+              Pattern pattern = Pattern.compile(patternStr);
+              Matcher matcher = pattern.matcher(sb.toString());
+              if (matcher.find(0))
+                disk_present = false;
+              else
+                disk_present = true;            
+          }
+        if (file_present || (disk_present && smart_present)) {
+          disks_ok++;
+        } else
+          devices[i] = null;
+      } 
+      
+      // now remove disks that dont exist
+      StringBuffer resetSB = new StringBuffer();
+      for (int j = 0; j < devices.length; j++) {
+        resetSB.append(devices[j] == null ? "" : devices[j] + ", ");
+	if (devices[j] != null)
+	    logInfo("Found S.M.A.R.T. attributes for disk " + devices[j]);
+      }
+      // fix the property
+      if (resetSB.length() >= 2)
+        setProperty("disks.list", resetSB.substring(0, resetSB.length() - 2));
+      
+      if (disks_ok > 0) {
+        monitors.add(new MonitorJob(new SMARTParser(), "disks", timeInt));
+	logInfo("Created Monitor for S.M.A.R.T disk attributes");
+      }
+    }
+
+    // for lm-sensors
+    tmp = getProperty("sensors.enabled");
+
+    timeInt = setValue("sensors.interval", DEFAULT_POLL_INTERVAL);
+    
+    if ("true".equalsIgnoreCase(tmp) && checkExistence("sensors")) {
+      monitors.add(new MonitorJob(new SensorsParser(), "sensors", timeInt));
+      logInfo("Created Monitor for lm-sensors output");
+    }
+
+    return monitors;
+  }
+
+  /**
+   * Determines the minimum interval at which the executor thread
+   * needs to wake upto execute jobs. Essentially, this is interval 
+   * equals the GCD of intervals of all scheduled jobs. 
+   * 
+   *  @param monitors the list of scheduled jobs
+   *  
+   *  @return the minimum interval between two scheduled jobs
+   */
+  public static int getInterval(ArrayList<MonitorJob> monitors) {
+    String tmp = getProperty("executor.interval.min");
+    if (tmp != null)
+      MIN_INTERVAL = Integer.parseInt(tmp);
+
+    int[] monIntervals = new int[monitors.size()];
+
+    for (int i = 0; i < monitors.size(); i++)
+      monIntervals[i] = monitors.get(i).interval;
+
+    return Math.max(MIN_INTERVAL, gcd(monIntervals));
+  }
+
+  /**
+   * Checks whether a specific shell command is available
+   * in the system. 
+   * 
+   *  @param cmd the command to check against
+   *
+   *  @return true, if the command is availble, false otherwise
+   */
+  public static boolean checkExistence(String cmd) {
+    StringBuffer sb = runCommand("which " + cmd);
+    if (sb.length() > 1)
+      return true;
+
+    return false;
+  }
+
+  /**
+   * Runs a shell command in the system and provides a StringBuffer
+   * with the output of the command.
+   * 
+   *  @param cmd an array of string that form the command to run 
+   *  
+   *  @return a StringBuffer that contains the output of the command 
+   */
+  public static StringBuffer runCommand(String[] cmd) {
+    StringBuffer retval = new StringBuffer(MAX_OUTPUT_LENGTH);
+    Process p;
+    try {
+      p = Runtime.getRuntime().exec(cmd);
+      InputStream tmp = p.getInputStream();
+      p.waitFor();
+      int c;
+      while ((c = tmp.read()) != -1)
+        retval.append((char) c);
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    return retval;
+  }
+
+  /**
+   * Runs a shell command in the system and provides a StringBuffer
+   * with the output of the command.
+   * 
+   *  @param cmd the command to run 
+   *  
+   *  @return a StringBuffer that contains the output of the command 
+   */
+  public static StringBuffer runCommand(String cmd) {
+    return runCommand(cmd.split("\\s+"));
+  }
+
+  /**
+   * Determines the greatest common divisor (GCD) of two integers.
+   * 
+   *  @param m the first integer
+   *  @param n the second integer
+   *  
+   *  @return the greatest common divisor of m and n
+   */
+  public static int gcd(int m, int n) {
+    if (m == 0 && n == 0)
+      return 0;
+    if (m < n) {
+      int t = m;
+      m = n;
+      n = t;
+    }
+    int r = m % n;
+    if (r == 0) {
+      return n;
+    } else {
+      return gcd(n, r);
+    }
+  }
+
+  /**
+   * Determines the greatest common divisor (GCD) of a list
+   * of integers.
+   * 
+   *  @param numbers the list of integers to process
+   *  
+   *  @return the greatest common divisor of all numbers
+   */
+  public static int gcd(int[] numbers) {
+
+    if (numbers.length == 1)
+      return numbers[0];
+
+    int g = gcd(numbers[0], numbers[1]);
+
+    for (int i = 2; i < numbers.length; i++)
+      g = gcd(g, numbers[i]);
+
+    return g;
+  }
+
+  private static String [] expandDirs(String [] input, String patternStr) {
+
+    ArrayList<String> fnames = new ArrayList<String>();
+    Pattern pattern = Pattern.compile(patternStr);
+    Matcher matcher;
+    File f;
+    
+    for (String fname : input) {
+      f = new File(fname);
+      if (f.exists()) {
+	if (f.isDirectory()) {
+	  // add all matching files
+	  File [] fcs = f.listFiles();
+	  for (File fc : fcs) {
+	    matcher = pattern.matcher(fc.getName());
+	    if (matcher.find() && fc.isFile())
+	      fnames.add(fc.getAbsolutePath());
+	  }
+	} else {
+	  // normal file, just add to output
+	  fnames.add(f.getAbsolutePath());
+	}
+      }
+    }
+    return fnames.toArray(input);
+  }
+
+  private static int setValue(String propname, int defaultValue) {
+
+    String v = getProperty(propname);
+
+    if (v != null)
+      return Integer.parseInt(v);
+    else
+      return defaultValue;
+  }
+
+  
+  public static void logInfo(String str) {
+    LOG.info(str);
+  }
+}

+ 151 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/EventRecord.java

@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+
+/**********************************************************
+ * Objects of this class represent metrics collected for 
+ * a specific hardware source. Each EventRecord contains a HashMap of 
+ * (key, value) pairs, each of which represents a property of
+ * the metered value. For instance, when parsing a log file, an
+ * EventRecord is created for each log entry, which contains 
+ * the hostname and the ip addresses of the node, timestamp of
+ * the log entry, the actual message etc. Each and every EventRecord
+ * contains the hostname of the machine on which it was collected,
+ * its IP address and the time of collection.
+ * 
+ * The main purpose of this class is to provide a uniform format
+ * for records collected from various system compontents (logs,
+ * ifconfig, smartmontools, lm-sensors etc). All metric values are 
+ * converted into this format after they are collected by a
+ * Monitored object.
+ *
+ **********************************************************/
+
+public class EventRecord {
+
+  HashMap<String, Object> fields;
+
+  /**
+   * Create the EventRecord given the most common properties
+   * among different metric types.
+   */
+  public EventRecord(String _hostname, Object [] _ips, Calendar _timestamp,
+      String _type, String _logLevel, String _source, String _message) {
+    fields = new HashMap<String, Object>();
+    fields.clear();
+    set("hostname", _hostname);
+    set("ips", _ips);
+    set("timestamp", _timestamp);
+    set("type", _type);
+    set("logLevel", _logLevel);
+    set("source", _source);
+    set("message", _message);
+  }
+
+  /**
+   * Create the EventRecord with no fields other than "invalid" as
+   * the hostname. This is only used as a dummy.
+   */
+  public EventRecord() {
+    // creates an invalid record
+    fields = new HashMap<String, Object>();
+    fields.clear();
+    set("hostname", "invalid");
+  }
+
+  /**
+   * Return the HashMap of properties of the EventRecord.
+   * 
+   * @return a HashMap that contains all properties of the record.
+   */
+  public final HashMap<String, Object> getMap() {
+    return fields;
+  }
+
+  /**
+   * Set the value of a property of the EventRecord.
+   * 
+   * @param fieldName the name of the property to set
+   * @param fieldValue the value of the property to set
+   * 
+   */
+  public void set(String fieldName, Object fieldValue) {
+    if (fieldValue != null)
+      fields.put(fieldName, fieldValue);
+  }
+
+  /**
+   * Get the value of a property of the EventRecord.
+   * If the property with the specific key is not found,
+   * null is returned.
+   * 
+   * @param fieldName the name of the property to get.
+   */
+  public Object get(String fieldName) {
+    return fields.get(fieldName);
+  }
+
+  /**
+   * Check if the EventRecord is a valid one, i.e., whether
+   * it represents meaningful metric values.
+   * 
+   * @return true if the EventRecord is a valid one, false otherwise.
+   */
+  public boolean isValid() {
+    return !("invalid".equalsIgnoreCase((String) fields.get("hostname")));
+  }
+
+  /**
+   * Creates and returns a string representation of the object.
+   * 
+   * @return a String representation of the object
+   */
+
+  public String toString() {
+    String retval = "";
+    ArrayList<String> keys = new ArrayList<String>(fields.keySet());
+    Collections.sort(keys);
+
+    for (int i = 0; i < keys.size(); i++) {
+      Object value = fields.get(keys.get(i));
+      if (value == null)
+        retval += keys.get(i) + ":\tnull\n";
+      else if (value instanceof String)
+        retval += keys.get(i) + ":\t" + value + "\n";
+      else if (value instanceof Calendar)
+        retval += keys.get(i) + ":\t" + ((Calendar) value).getTime() + "\n";
+      else if (value instanceof InetAddress[] || value instanceof String []) {
+        retval += "Known IPs:\t";
+        for (InetAddress ip : ((InetAddress[]) value))
+          retval += ip.getHostAddress() + " ";
+        retval += "\n";
+      } else {
+        retval += keys.get(i) + ":\t" + value.toString() + "\n";
+      }
+    }
+    return retval;
+  }
+
+}

+ 120 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Executor.java

@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**********************************************************
+ * This class executes monitoring jobs on all nodes of the
+ * cluster, on which we intend to gather failure metrics. 
+ * It is basically a thread that sleeps and periodically wakes
+ * up to execute monitoring jobs and ship all gathered data to 
+ * a "safe" location, which in most cases will be the HDFS 
+ * filesystem of the monitored cluster.
+ * 
+ **********************************************************/
+
+public class Executor implements Runnable {
+
+  public static final int DEFAULT_LOG_INTERVAL = 3600;
+
+  public static final int DEFAULT_POLL_INTERVAL = 360;
+
+  public static int MIN_INTERVAL = 5;
+
+  public static int instances = 0;
+
+  LocalStore lstore;
+
+  ArrayList<MonitorJob> monitors;
+  
+  int interval;
+
+  int upload_interval;
+  int upload_counter;
+  
+  /**
+   * Create an instance of the class and read the configuration
+   * file to determine the set of jobs that will be run and the 
+   * maximum interval for which the thread can sleep before it 
+   * wakes up to execute a monitoring job on the node.
+   * 
+   */ 
+
+  public Executor(Configuration conf) {
+    
+    Environment.prepare("conf/failmon.properties");
+    
+    String localTmpDir;
+    
+    if (conf == null) {
+      // running as a stand-alone application
+      localTmpDir = System.getProperty("java.io.tmpdir");
+      Environment.setProperty("local.tmp.dir", localTmpDir);
+    } else {
+      // running from within Hadoop
+      localTmpDir = conf.get("hadoop.tmp.dir");
+      String hadoopLogPath = System.getProperty("hadoop.log.dir") + "/" + System.getProperty("hadoop.log.file");
+      Environment.setProperty("hadoop.log.file", hadoopLogPath);
+      Environment.setProperty("local.tmp.dir", localTmpDir);
+    }
+    
+    monitors = Environment.getJobs();
+    interval = Environment.getInterval(monitors);
+    upload_interval = LocalStore.UPLOAD_INTERVAL;
+    lstore = new LocalStore();
+    
+    if (Environment.getProperty("local.upload.interval") != null) 
+     upload_interval = Integer.parseInt(Environment.getProperty("local.upload.interval"));
+
+    instances++;
+  }
+
+  public void run() {
+    upload_counter = upload_interval;
+
+    Environment.logInfo("Failmon Executor thread started successfully.");
+    while (true) {
+      try {
+        Thread.sleep(interval * 1000);
+        for (int i = 0; i < monitors.size(); i++) {
+          monitors.get(i).counter -= interval;
+          if (monitors.get(i).counter <= 0) {
+            monitors.get(i).reset();
+            Environment.logInfo("Calling " + monitors.get(i).job.getInfo() + "...\t");
+            monitors.get(i).job.monitor(lstore);
+          }
+        }
+        upload_counter -= interval;
+        if (upload_counter <= 0) {
+          lstore.upload();
+          upload_counter = upload_interval;
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  public void cleanup() {
+    instances--;   
+  }
+}

+ 154 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HDFSMerger.java

@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.BufferedOutputStream;
+import java.io.InputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+public class HDFSMerger {
+
+  Configuration hadoopConf;
+  FileSystem hdfs;
+  
+  String hdfsDir;
+  
+  FileStatus [] inputFiles;
+
+  Path outputFilePath;
+  FSDataOutputStream outputFile;
+    
+  boolean compress;
+
+  FileWriter fw;
+
+  BufferedWriter writer;
+
+  public HDFSMerger() throws IOException {
+
+    String hadoopConfPath; 
+
+    if (Environment.getProperty("hadoop.conf.path") == null)
+      hadoopConfPath = "../../../conf";
+    else
+      hadoopConfPath = Environment.getProperty("hadoop.conf.path");
+
+    // Read the configuration for the Hadoop environment
+    Configuration hadoopConf = new Configuration();
+    hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-default.xml"));
+    hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-site.xml"));
+    
+    // determine the local output file name
+    if (Environment.getProperty("local.tmp.filename") == null)
+      Environment.setProperty("local.tmp.filename", "failmon.dat");
+    
+    // determine the upload location
+    hdfsDir = Environment.getProperty("hdfs.upload.dir");
+    if (hdfsDir == null)
+      hdfsDir = "/failmon";
+
+    hdfs = FileSystem.get(hadoopConf);
+    
+    Path hdfsDirPath = new Path(hadoopConf.get("fs.default.name") + hdfsDir);
+
+    try {
+      if (!hdfs.getFileStatus(hdfsDirPath).isDir()) {
+	Environment.logInfo("HDFSMerger: Not an HDFS directory: " + hdfsDirPath.toString());
+	System.exit(0);
+      }
+    } catch (FileNotFoundException e) {
+      Environment.logInfo("HDFSMerger: Directory not found: " + hdfsDirPath.toString());
+    }
+
+    inputFiles = hdfs.listStatus(hdfsDirPath);
+
+    outputFilePath = new Path(hdfsDirPath.toString() + "/" + "merge-"
+			  + Calendar.getInstance().getTimeInMillis() + ".dat");
+    outputFile = hdfs.create(outputFilePath);
+    
+    for (FileStatus fstatus : inputFiles) {
+      appendFile(fstatus.getPath());
+      hdfs.delete(fstatus.getPath());
+    }
+
+    outputFile.close();
+
+    Environment.logInfo("HDFS file merging complete!");
+  }
+
+  private void appendFile (Path inputPath) throws IOException {
+    
+    FSDataInputStream anyInputFile = hdfs.open(inputPath);
+    InputStream inputFile;
+    byte buffer[] = new byte[4096];
+    
+    if (inputPath.toString().endsWith(LocalStore.COMPRESSION_SUFFIX)) {
+      // the file is compressed
+      inputFile = new ZipInputStream(anyInputFile);
+      ((ZipInputStream) inputFile).getNextEntry();
+    } else {
+      inputFile = anyInputFile;
+    }
+    
+    try {
+      int bytesRead = 0;
+      while ((bytesRead = inputFile.read(buffer)) > 0) {
+	outputFile.write(buffer, 0, bytesRead);
+      }
+    } catch (IOException e) {
+      Environment.logInfo("Error while copying file:" + inputPath.toString());
+    } finally {
+      inputFile.close();
+    }    
+  }
+
+  
+  public static void main(String [] args) {
+
+    Environment.prepare("./conf/failmon.properties");
+
+    try {
+      new HDFSMerger();
+    } catch (IOException e) {
+      e.printStackTrace();
+      }
+
+  }
+}

+ 136 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/HadoopLogParser.java

@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**********************************************************
+ * An object of this class parses a Hadoop log file to create
+ * appropriate EventRecords. The log file can either be the log 
+ * of a NameNode or JobTracker or DataNode or TaskTracker.
+ * 
+ **********************************************************/
+
+public class HadoopLogParser extends LogParser {
+
+  /**
+   * Create a new parser object and try to find the hostname
+   * of the node that generated the log
+   */
+  public HadoopLogParser(String fname) {
+    super(fname);
+    if ((dateformat = Environment.getProperty("log.hadoop.dateformat")) == null)
+      dateformat = "\\d{4}-\\d{2}-\\d{2}";
+    if ((timeformat = Environment.getProperty("log.hadoop.timeformat")) == null)
+      timeformat = "\\d{2}:\\d{2}:\\d{2}";
+    findHostname();
+  }
+
+  /**
+   * Parses one line of the log. If the line contains a valid 
+   * log entry, then an appropriate EventRecord is returned, after all
+   * relevant fields have been parsed.
+   *
+   *  @param line the log line to be parsed
+   *
+   *  @return the EventRecord representing the log entry of the line. If 
+   *  the line does not contain a valid log entry, then the EventRecord 
+   *  returned has isValid() = false. When the end-of-file has been reached,
+   *  null is returned to the caller.
+   */
+  public EventRecord parseLine(String line) throws IOException {
+    EventRecord retval = null;
+
+    if (line != null) {
+      // process line
+      String patternStr = "(" + dateformat + ")";
+      patternStr += "\\s+";
+      patternStr += "(" + timeformat + ")";
+      patternStr += ".{4}\\s(\\w*)\\s"; // for logLevel
+      patternStr += "\\s*([\\w+\\.?]+)"; // for source
+      patternStr += ":\\s+(.+)"; // for the message
+      Pattern pattern = Pattern.compile(patternStr);
+      Matcher matcher = pattern.matcher(line);
+
+      if (matcher.find(0) && matcher.groupCount() >= 5) {
+        retval = new EventRecord(hostname, ips, parseDate(matcher.group(1),
+            matcher.group(2)),
+	    "HadoopLog",
+	    matcher.group(3), // loglevel
+            matcher.group(4), // source
+            matcher.group(5)); // message
+      } else {
+        retval = new EventRecord();
+      }
+    }
+
+    return retval;
+  }
+
+  /**
+   * Parse a date found in the Hadoop log.
+   * 
+   * @return a Calendar representing the date
+   */
+  protected Calendar parseDate(String strDate, String strTime) {
+    Calendar retval = Calendar.getInstance();
+    // set date
+    String[] fields = strDate.split("-");
+    retval.set(Calendar.YEAR, Integer.parseInt(fields[0]));
+    retval.set(Calendar.MONTH, Integer.parseInt(fields[1]));
+    retval.set(Calendar.DATE, Integer.parseInt(fields[2]));
+    // set time
+    fields = strTime.split(":");
+    retval.set(Calendar.HOUR_OF_DAY, Integer.parseInt(fields[0]));
+    retval.set(Calendar.MINUTE, Integer.parseInt(fields[1]));
+    retval.set(Calendar.SECOND, Integer.parseInt(fields[2]));
+    return retval;
+  }
+
+  /**
+   * Attempt to determine the hostname of the node that created the
+   * log file. This information can be found in the STARTUP_MSG lines 
+   * of the Hadoop log, which are emitted when the node starts.
+   * 
+   */
+  private void findHostname() {
+    String startupInfo = Environment.runCommand(
+        "grep --max-count=1 STARTUP_MSG:\\s*host " + file.getName()).toString();
+    Pattern pattern = Pattern.compile("\\s+(\\w+/.+)\\s+");
+    Matcher matcher = pattern.matcher(startupInfo);
+    if (matcher.find(0)) {
+      hostname = matcher.group(1).split("/")[0];
+      ips = new String[1];
+      ips[0] = matcher.group(1).split("/")[1];
+    }
+  }
+  
+  /**
+   * Return a String with information about this class
+   * 
+   * @return A String describing this class
+   */
+  public String getInfo() {
+    return ("Hadoop Log Parser for file: " + file.getName());
+  }
+
+}

+ 268 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/LocalStore.java

@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**********************************************************
+ * This class takes care of the temporary local storage of 
+ * gathered metrics before they get uploaded into HDFS. It writes 
+ * Serialized Records as lines in a temporary file and then 
+ * compresses and uploads it into HDFS.
+ * 
+ **********************************************************/
+
+public class LocalStore {
+
+  public final static char FIELD_SEPARATOR = '|';
+
+  public final static char RECORD_SEPARATOR = '\n';
+
+  public final static String COMPRESSION_SUFFIX = ".zip";
+
+  public final static int UPLOAD_INTERVAL = 600;
+
+  String filename;
+  String hdfsDir;
+
+  boolean compress;
+
+  FileWriter fw;
+
+  BufferedWriter writer;
+
+  /**
+   * Create an instance of the class and read the configuration
+   * file to determine some output parameters. Then, initiate the 
+   * structured needed for the buffered I/O (so that smal appends
+   * can be handled efficiently).
+   * 
+   */ 
+
+  public LocalStore() {
+    // determine the local output file name
+    if (Environment.getProperty("local.tmp.filename") == null)
+      Environment.setProperty("local.tmp.filename", "failmon.dat");
+    
+    // local.tmp.dir has been set by the Executor
+    if (Environment.getProperty("local.tmp.dir") == null)
+      Environment.setProperty("local.tmp.dir", System.getProperty("java.io.tmpdir"));
+    
+    filename = Environment.getProperty("local.tmp.dir") + "/" +
+      Environment.getProperty("local.tmp.filename");
+
+    // determine the upload location
+    hdfsDir = Environment.getProperty("hdfs.upload.dir");
+    if (hdfsDir == null)
+      hdfsDir = "/failmon";
+
+    // determine if compression is enabled
+    compress = true;
+    if ("false".equalsIgnoreCase(Environment
+        .getProperty("local.tmp.compression")))
+      compress = false;
+
+    try {
+      fw = new FileWriter(filename, true);
+      writer = new BufferedWriter(fw);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Insert an EventRecord to the local storage, after it
+   * gets serialized and anonymized.
+   * 
+   * @param er the EventRecord to be inserted
+   */ 
+  
+  public void insert(EventRecord er) {
+    SerializedRecord sr = new SerializedRecord(er);
+    try {
+      Anonymizer.anonymize(sr);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    append(sr);
+  }
+
+  /**
+   * Insert an array of EventRecords to the local storage, after they
+   * get serialized and anonymized.
+   * 
+   * @param ers the array of EventRecords to be inserted
+   */
+  public void insert(EventRecord[] ers) {
+    for (EventRecord er : ers)
+      insert(er);
+  }
+
+  private void append(SerializedRecord sr) {
+    try {
+      writer.write(pack(sr).toString());
+      writer.write(RECORD_SEPARATOR);
+      // writer.flush();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Pack a SerializedRecord into an array of bytes
+   * 
+   * @param sr the SerializedRecord to be packed
+   */
+  public static StringBuffer pack(SerializedRecord sr) {
+    StringBuffer sb = new StringBuffer();
+
+    ArrayList<String> keys = new ArrayList<String>(sr.fields.keySet());
+
+    if (sr.isValid())
+      SerializedRecord.arrangeKeys(keys);
+
+    for (int i = 0; i < keys.size(); i++) {
+      String value = sr.fields.get(keys.get(i));
+      sb.append(keys.get(i) + ":" + value);
+      sb.append(FIELD_SEPARATOR);
+    }
+    return sb;
+  }
+
+  /**
+   * Upload the local file store into HDFS, after it 
+   * compressing it. Then a new local file is created 
+   * as a temporary record store.
+   * 
+   */
+  public void upload() {
+    try {
+      writer.flush();
+      if (compress)
+        zipCompress(filename);
+      String remoteName = "failmon-";
+      if ("true".equalsIgnoreCase(Environment.getProperty("anonymizer.hash.hostnames")))
+        remoteName += Anonymizer.getMD5Hash(InetAddress.getLocalHost().getCanonicalHostName()) + "-";
+      else
+        remoteName += InetAddress.getLocalHost().getCanonicalHostName() + "-"; 
+      remoteName += Calendar.getInstance().getTimeInMillis();//.toString();
+      if (compress)
+	copyToHDFS(filename + COMPRESSION_SUFFIX, hdfsDir + "/" + remoteName + COMPRESSION_SUFFIX);
+      else
+	copyToHDFS(filename, hdfsDir + "/" + remoteName);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    // delete and re-open
+    try {
+      fw.close();
+      fw = new FileWriter(filename);
+      writer = new BufferedWriter(fw);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+  
+  /**
+   * Compress a text file using the ZIP compressing algorithm.
+   * 
+   * @param filename the path to the file to be compressed
+   */
+  public static void zipCompress(String filename) throws IOException {
+    FileOutputStream fos = new FileOutputStream(filename + COMPRESSION_SUFFIX);
+    CheckedOutputStream csum = new CheckedOutputStream(fos, new CRC32());
+    ZipOutputStream out = new ZipOutputStream(new BufferedOutputStream(csum));
+    out.setComment("Failmon records.");
+
+    BufferedReader in = new BufferedReader(new FileReader(filename));
+    out.putNextEntry(new ZipEntry(new File(filename).getName()));
+    int c;
+    while ((c = in.read()) != -1)
+      out.write(c);
+    in.close();
+
+    out.finish();
+    out.close();
+  }
+
+  /**
+   * Copy a local file to HDFS
+   * 
+   * @param localFile the filename of the local file
+   * @param hdfsFile the HDFS filename to copy to
+   */
+  public static void copyToHDFS(String localFile, String hdfsFile) throws IOException {
+
+    String hadoopConfPath; 
+
+    if (Environment.getProperty("hadoop.conf.path") == null)
+      hadoopConfPath = "../../../conf";
+    else
+      hadoopConfPath = Environment.getProperty("hadoop.conf.path");
+
+    // Read the configuration for the Hadoop environment
+    Configuration hadoopConf = new Configuration();
+    hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-default.xml"));
+    hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-site.xml"));
+
+    // System.out.println(hadoopConf.get("hadoop.tmp.dir"));
+    // System.out.println(hadoopConf.get("fs.default.name"));
+    FileSystem fs = FileSystem.get(hadoopConf);
+
+    // HadoopDFS deals with Path
+    Path inFile = new Path("file://" + localFile);
+    Path outFile = new Path(hadoopConf.get("fs.default.name") + hdfsFile);
+
+     // Read from and write to new file
+    Environment.logInfo("Uploading to HDFS (file " + outFile + ") ...");
+    fs.copyFromLocalFile(false, inFile, outFile);
+  }
+
+  /**
+   * Close the temporary local file
+   * 
+   */ 
+  public void close() {
+    try {
+    writer.flush();
+    writer.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+}

+ 214 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/LogParser.java

@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Calendar;
+
+/**********************************************************
+ * This class represents objects that provide log parsing 
+ * functionality. Typically, such objects read log files line
+ * by line and for each log entry they identify, they create a 
+ * corresponding EventRecord. In this way, disparate log files
+ * can be merged using the uniform format of EventRecords and can,
+ * thus, be processed in a uniform way.
+ * 
+ **********************************************************/
+
+public abstract class LogParser implements Monitored {
+
+  File file;
+
+  BufferedReader reader;
+
+  String hostname;
+
+  Object [] ips;
+
+  String dateformat;
+
+  String timeformat;
+
+  private String firstLine;
+  private long offset;
+
+  /**
+   * Create a parser that will read from the specified log file.
+   * 
+   * @param fname the filename of the log file to be read
+   */
+  public LogParser(String fname) {
+    file = new File(fname);
+
+    ParseState ps = PersistentState.getState(file.getAbsolutePath());
+    firstLine = ps.firstLine;
+    offset = ps.offset;
+    
+    try {
+      reader = new BufferedReader(new FileReader(file));
+      checkForRotation();
+      Environment.logInfo("Checked for rotation...");
+      reader.skip(offset);
+    } catch (FileNotFoundException e) {
+      System.err.println(e.getMessage());
+      e.printStackTrace();
+    } catch (IOException e) {
+      System.err.println(e.getMessage());
+      e.printStackTrace();
+    }
+
+    setNetworkProperties();
+  }
+
+  protected void setNetworkProperties() {
+    // determine hostname and ip addresses for the node
+    try {
+      // Get hostname
+      hostname = InetAddress.getLocalHost().getCanonicalHostName();
+      // Get all associated ip addresses
+      ips = InetAddress.getAllByName(hostname);
+
+    } catch (UnknownHostException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Insert all EventRecords that can be extracted for
+   * the represented hardware component into a LocalStore.
+   * 
+   * @param ls the LocalStore into which the EventRecords 
+   * are to be stored.
+   */
+  public void monitor(LocalStore ls) {
+    int in = 0;
+    EventRecord er = null;
+    Environment.logInfo("Started processing log...");
+
+    while ((er = getNext()) != null) {
+      // Environment.logInfo("Processing log line:\t" + in++);
+      if (er.isValid()) {
+        ls.insert(er);
+      }
+    }
+
+    PersistentState.updateState(file.getAbsolutePath(), firstLine, offset);
+    PersistentState.writeState("conf/parsing.state");
+  }
+
+  /**
+   * Get an array of all EventRecords that can be extracted for
+   * the represented hardware component.
+   * 
+   * @return The array of EventRecords
+   */
+  public EventRecord[] monitor() {
+
+    ArrayList<EventRecord> recs = new ArrayList<EventRecord>();
+    EventRecord er;
+
+    while ((er = getNext()) != null)
+      recs.add(er);
+
+    EventRecord[] T = new EventRecord[recs.size()];
+
+    return recs.toArray(T);
+  }
+
+  /**
+   * Continue parsing the log file until a valid log entry is identified.
+   * When one such entry is found, parse it and return a corresponding EventRecord.
+   * 
+   *  
+   * @return The EventRecord corresponding to the next log entry
+   */
+  public EventRecord getNext() {
+    try {
+	String line = reader.readLine();
+	if (line != null) {
+	    if (firstLine == null)
+		firstLine = new String(line);
+	    offset += line.length() + 1;
+	    return parseLine(line);
+	}
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    return null;
+  }
+
+  /**
+   * Return the BufferedReader, that reads the log file
+   *  
+   * @return The BufferedReader that reads the log file
+   */
+  public BufferedReader getReader() {
+    return reader;
+  }
+
+  /**
+   * Check whether the log file has been rotated. If so,
+   * start reading the file from the beginning.
+   *  
+   */
+  public void checkForRotation() {
+    try {
+      BufferedReader probe = new BufferedReader(new FileReader(file.getAbsoluteFile()));
+      if (firstLine == null || (!firstLine.equals(probe.readLine()))) {
+	probe.close();
+	// start reading the file from the beginning
+        reader.close();
+        reader = new BufferedReader(new FileReader(file.getAbsoluteFile()));
+	firstLine = null;
+	offset = 0;
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  /**
+   * Parses one line of the log. If the line contains a valid 
+   * log entry, then an appropriate EventRecord is returned, after all
+   * relevant fields have been parsed.
+   *
+   *  @param line the log line to be parsed
+   *
+   *  @return the EventRecord representing the log entry of the line. If 
+   *  the line does not contain a valid log entry, then the EventRecord 
+   *  returned has isValid() = false. When the end-of-file has been reached,
+   *  null is returned to the caller.
+   */
+  abstract public EventRecord parseLine(String line) throws IOException;
+
+  /**
+   * Parse a date found in Hadoop log file.
+   * 
+   * @return a Calendar representing the date
+   */
+  abstract protected Calendar parseDate(String strDate, String strTime);
+
+}

+ 43 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/MonitorJob.java

@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+/**********************************************************
+ * This class is a wrapper for a monitoring job. 
+ * 
+ **********************************************************/
+
+public class MonitorJob {
+  Monitored job;
+
+  String type;
+  int interval;
+  int counter;
+
+  public MonitorJob(Monitored _job, String _type, int _interval) {
+    job = _job;
+    type = _type;
+    interval = _interval;
+    counter = _interval;
+  }
+
+  public void reset() {
+    counter = interval;
+  }
+}

+ 53 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/Monitored.java

@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+/**********************************************************
+ * Represents objects that monitor specific hardware resources and
+ * can query them to get EventRecords describing the state of these
+ * resources.
+ *
+ **********************************************************/
+
+public interface Monitored {
+  /**
+   * Get an array of all EventRecords that can be extracted for
+   * the represented hardware component.
+   * 
+   * @return The array of EventRecords
+   */
+  public EventRecord[] monitor();
+  
+  /**
+   * Inserts all EventRecords that can be extracted for
+   * the represented hardware component into a LocalStore.
+   * 
+   * @param ls the LocalStore into which the EventRecords 
+   * are to be stored.
+   */
+  public void monitor(LocalStore ls);
+  
+  /**
+   * Return a String with information about the implementing
+   * class 
+   * 
+   * @return A String describing the implementing class
+   */
+  public String getInfo();
+}

+ 140 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/NICParser.java

@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Calendar;
+
+/**********************************************************
+ * Objects of this class parse the output of ifconfig to 
+ * gather information about present Network Interface Cards
+ * in the system. The list of NICs to poll is specified in the 
+ * configuration file.
+ * 
+ **********************************************************/
+
+
+public class NICParser extends ShellParser {
+
+  String[] nics;
+
+  /**
+   * Constructs a NICParser and reads the list of NICs to query
+   */
+  public NICParser() {
+    super();
+    nics = Environment.getProperty("nic.list").split(",\\s*");
+  }
+
+  /**
+   * Reads and parses the output of ifconfig for a specified NIC and 
+   * creates an appropriate EventRecord that holds the desirable 
+   * information for it.
+   * 
+   * @param device the NIC device name to query
+   * 
+   * @return the EventRecord created
+   */
+  public EventRecord query(String device) throws UnknownHostException {
+    StringBuffer sb = Environment.runCommand("/sbin/ifconfig " + device);
+    EventRecord retval = new EventRecord(InetAddress.getLocalHost()
+        .getCanonicalHostName(), InetAddress.getAllByName(InetAddress.getLocalHost()
+        .getHostName()), Calendar.getInstance(), "NIC", "Unknown", device, "-");
+
+    retval.set("hwAddress", findPattern("HWaddr\\s*([\\S{2}:]{17})", sb
+        .toString(), 1));
+
+    retval.set("ipAddress", findPattern("inet\\s+addr:\\s*([\\w.?]*)", sb
+        .toString(), 1));
+
+    String tmp = findPattern("inet\\s+addr:\\s*([\\w.?]*)", sb.toString(), 1);
+    retval.set("status", (tmp == null) ? "DOWN" : "UP");
+    if (tmp != null)
+      retval.set("ipAddress", tmp);
+
+    retval.set("rxPackets", findPattern("RX\\s*packets\\s*:\\s*(\\d+)", sb
+        .toString(), 1));
+    retval.set("rxErrors", findPattern("RX.+errors\\s*:\\s*(\\d+)", sb
+        .toString(), 1));
+    retval.set("rxDropped", findPattern("RX.+dropped\\s*:\\s*(\\d+)", sb
+        .toString(), 1));
+    retval.set("rxOverruns", findPattern("RX.+overruns\\s*:\\s*(\\d+)", sb
+        .toString(), 1));
+    retval.set("rxFrame", findPattern("RX.+frame\\s*:\\s*(\\d+)",
+        sb.toString(), 1));
+
+    retval.set("txPackets", findPattern("TX\\s*packets\\s*:\\s*(\\d+)", sb
+        .toString(), 1));
+    retval.set("txErrors", findPattern("TX.+errors\\s*:\\s*(\\d+)", sb
+        .toString(), 1));
+    retval.set("txDropped", findPattern("TX.+dropped\\s*:\\s*(\\d+)", sb
+        .toString(), 1));
+    retval.set("txOverruns", findPattern("TX.+overruns\\s*:\\s*(\\d+)", sb
+        .toString(), 1));
+    retval.set("txCarrier", findPattern("TX.+carrier\\s*:\\s*(\\d+)", sb
+        .toString(), 1));
+
+    retval.set("collisions", findPattern("\\s+collisions\\s*:\\s*(\\d+)", sb
+        .toString(), 1));
+
+    retval.set("rxBytes", findPattern("RX\\s*bytes\\s*:\\s*(\\d+)", sb
+        .toString(), 1));
+    retval.set("txBytes", findPattern("TX\\s*bytes\\s*:\\s*(\\d+)", sb
+        .toString(), 1));
+
+    return retval;
+  }
+
+  /**
+   * Invokes query() to do the parsing and handles parsing errors for 
+   * each one of the NICs specified in the configuration. 
+   * 
+   * @return an array of EventRecords that holds one element that represents
+   * the current state of network interfaces.
+   */
+  public EventRecord[] monitor() {
+    ArrayList<EventRecord> recs = new ArrayList<EventRecord>();
+
+    for (String nic : nics) {
+      try {
+        recs.add(query(nic));
+      } catch (UnknownHostException e) {
+        e.printStackTrace();
+      }
+    }
+
+    EventRecord[] T = new EventRecord[recs.size()];
+
+    return recs.toArray(T);
+  }
+  
+  /**
+   * Return a String with information about this class
+   * 
+   * @return A String describing this class
+   */
+  public String getInfo() {
+    String retval = "ifconfig parser for interfaces: ";
+    for (String nic : nics)
+      retval += nic + " ";
+    return retval;
+  }
+}

+ 132 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/OfflineAnonymizer.java

@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+
+/**********************************************************
+ * This class can be used to anonymize logs independently of
+ * Hadoop and the Executor. It parses the specified log file to
+ * create log records for it and then passes them to the Anonymizer.
+ * After they are anonymized, they are written to a local file,
+ * which is then compressed and stored locally.
+ * 
+ **********************************************************/
+
+public class OfflineAnonymizer {
+
+  public enum LogType {
+    HADOOP, SYSTEM
+  };
+
+  LogType logtype;
+
+  File logfile;
+
+  LogParser parser;
+
+  /**
+   * Creates an OfflineAnonymizer for a specific log file.
+   * 
+   * @param logtype the type of the log file. This can either be
+   * LogFile.HADOOP or LogFile.SYSTEM
+   * @param filename the path to the log file
+   * 
+   */  
+  public OfflineAnonymizer(LogType logtype, String filename) {
+
+    logfile = new File(filename);
+
+    if (!logfile.exists()) {
+      System.err.println("Input file does not exist!");
+      System.exit(0);
+    }
+
+    if (logtype == LogType.HADOOP)
+      parser = new HadoopLogParser(filename);
+    else
+      parser = new SystemLogParser(filename);
+  }
+
+  /**
+   * Performs anonymization for the log file. Log entries are
+   * read one by one and EventRecords are created, which are then
+   * anonymized and written to the output.
+   * 
+   */
+  public void anonymize() throws Exception {
+    EventRecord er = null;
+    SerializedRecord sr = null;
+
+    BufferedWriter bfw = new BufferedWriter(new FileWriter(logfile.getName()
+        + ".anonymized"));
+
+    System.out.println("Anonymizing log records...");
+    while ((er = parser.getNext()) != null) {
+      if (er.isValid()) {
+        sr = new SerializedRecord(er);
+        Anonymizer.anonymize(sr);
+        bfw.write(LocalStore.pack(sr).toString());
+        bfw.write(LocalStore.RECORD_SEPARATOR);
+      }
+    }
+    bfw.flush();
+    bfw.close();
+    System.out.println("Anonymized log records written to " + logfile.getName()
+        + ".anonymized");
+
+    System.out.println("Compressing output file...");
+    LocalStore.zipCompress(logfile.getName() + ".anonymized");
+    System.out.println("Compressed output file written to " + logfile.getName()
+        + ".anonymized" + LocalStore.COMPRESSION_SUFFIX);
+  }
+
+  public static void main(String[] args) {
+
+    if (args.length < 2) {
+      System.out.println("Usage: OfflineAnonymizer <log_type> <filename>");
+      System.out
+          .println("where <log_type> is either \"hadoop\" or \"system\" and <filename> is the path to the log file");
+      System.exit(0);
+    }
+
+    LogType logtype = null;
+
+    if (args[0].equalsIgnoreCase("-hadoop"))
+      logtype = LogType.HADOOP;
+    else if (args[0].equalsIgnoreCase("-system"))
+      logtype = LogType.SYSTEM;
+    else {
+      System.err.println("Invalid first argument.");
+      System.exit(0);
+    }
+
+    OfflineAnonymizer oa = new OfflineAnonymizer(logtype, args[1]);
+
+    try {
+      oa.anonymize();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    return;
+  }
+}

+ 163 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/PersistentState.java

@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.util.Properties;
+import java.util.Calendar;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+/**********************************************************
+ * This class takes care of the information that needs to be
+ * persistently stored locally on nodes. Bookkeeping is done for the
+ * state of parsing of log files, so that the portion of the file that
+ * has already been parsed in previous calls will not be parsed again.
+ * For each log file, we maintain the byte offset of the last
+ * character parsed in previous passes. Also, the first entry in the
+ * log file is stored, so that FailMon can determine when a log file
+ * has been rotated (and thus parsing needs to start from the
+ * beginning of the file). We use a property file to store that
+ * information. For each log file we create a property keyed by the
+ * filename, the value of which contains the byte offset and first log
+ * entry separated by a SEPARATOR.
+ * 
+ **********************************************************/
+
+public class PersistentState {
+
+  private final static String SEPARATOR = "###";
+  
+  static String filename;
+  static Properties persData = new Properties();
+  
+  /**
+   * Read the state of parsing for all open log files from a property
+   * file.
+   * 
+   * @param fname the filename of the property file to be read
+   */
+
+  public static void readState(String fname) {
+
+    filename = fname;
+    
+    try {
+      persData.load(new FileInputStream(filename));
+    } catch (FileNotFoundException e1) {
+      // ignore
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+   /**
+   * Read and return the state of parsing for a particular log file.
+   * 
+   * @param fname the log file for which to read the state
+   */
+  public static ParseState getState(String fname) {
+    String [] fields = persData.getProperty(fname, "null" + SEPARATOR + "0").split(SEPARATOR, 2);
+    String firstLine;
+    long offset;
+    
+    if (fields.length < 2) {
+      System.err.println("Malformed persistent state data found");
+      Environment.logInfo("Malformed persistent state data found");
+      firstLine = null;
+      offset = 0;
+    } else {
+      firstLine = (fields[0].equals("null") ? null : fields[0]);
+      offset = Long.parseLong(fields[1]);
+    }
+
+    return new ParseState(fname, firstLine, offset);
+  }
+
+  /**
+   * Set the state of parsing for a particular log file.
+   * 
+   * @param state the ParseState to set
+   */
+  public static void setState(ParseState state) {
+
+    if (state == null) {
+      System.err.println("Null state found");
+      Environment.logInfo("Null state found");
+    }
+
+    persData.setProperty(state.filename, state.firstLine + SEPARATOR + state.offset);
+  }
+
+  /**
+   * Upadate the state of parsing for a particular log file.
+   * 
+   * @param filename the log file for which to update the state
+   * @param firstLine the first line of the log file currently
+   * @param offset the byte offset of the last character parsed
+   */ 
+  public static void updateState(String filename, String firstLine, long offset) {
+
+    ParseState ps = getState(filename);
+
+    if (firstLine != null)
+      ps.firstLine = firstLine;
+
+    ps.offset = offset;
+
+    setState(ps);
+  }
+
+  /**
+   * Write the state of parsing for all open log files to a property
+   * file on disk.
+   * 
+   * @param fname the filename of the property file to write to
+   */
+  public static void writeState(String fname) {
+    try {
+      persData.store(new FileOutputStream(fname), Calendar.getInstance().getTime().toString());
+    } catch (FileNotFoundException e1) {
+      e1.printStackTrace();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+  
+}
+
+/**********************************************************
+ * This class represents the state of parsing for a particular log
+ * file.
+ * 
+ **********************************************************/
+
+class ParseState {
+
+  public String filename;
+  public String firstLine;
+  public long offset;
+
+  public ParseState(String _filename, String _firstLine, long _offset) {
+    this.filename = _filename;
+    this.firstLine = _firstLine;
+    this.offset = _offset;
+  }
+}

+ 120 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/RunOnce.java

@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.util.ArrayList;
+
+/**********************************************************
+* Runs a set of monitoring jobs once for the local node. The set of
+* jobs to be run is the intersection of the jobs specifed in the
+* configuration file and the set of jobs specified in the --only
+* command line argument.
+ **********************************************************/ 
+
+public class RunOnce {
+
+  LocalStore lstore;
+
+  ArrayList<MonitorJob> monitors;
+  
+  boolean uploading = true;
+  
+  public RunOnce(String confFile) {
+    
+    Environment.prepare(confFile);
+    
+    String localTmpDir;
+    
+    // running as a stand-alone application
+    localTmpDir = System.getProperty("java.io.tmpdir");
+    Environment.setProperty("local.tmp.dir", localTmpDir);
+        
+    monitors = Environment.getJobs();
+    lstore = new LocalStore();
+    uploading  = true;
+  }
+
+  private void filter (String [] ftypes) {
+    ArrayList<MonitorJob> filtered = new ArrayList<MonitorJob>();
+    boolean found;
+    
+    // filter out unwanted monitor jobs
+    for (MonitorJob job : monitors) {
+      found = false;
+      for (String ftype : ftypes)
+	if (job.type.equalsIgnoreCase(ftype))
+	    found = true;
+      if (found)
+	filtered.add(job);
+    }
+
+    // disable uploading if not requested
+    found = false;
+    for (String ftype : ftypes)
+      if (ftype.equalsIgnoreCase("upload"))
+	found = true;
+
+    if (!found)
+      uploading = false;
+    
+    monitors = filtered;
+  }
+  
+  private void run() {
+    
+    Environment.logInfo("Failmon started successfully.");
+
+    for (int i = 0; i < monitors.size(); i++) {
+      Environment.logInfo("Calling " + monitors.get(i).job.getInfo() + "...\t");
+      monitors.get(i).job.monitor(lstore);
+    }
+
+    if (uploading)
+      lstore.upload();
+
+    lstore.close();
+  }
+
+  public void cleanup() {
+    // nothing to be done
+  }
+
+  
+  public static void main (String [] args) {
+
+    String configFilePath = "./conf/failmon.properties";
+    String [] onlyList = null;
+    
+    // Parse command-line parameters
+    for (int i = 0; i < args.length - 1; i++) {
+      if (args[i].equalsIgnoreCase("--config"))
+	configFilePath = args[i + 1];
+      else if (args[i].equalsIgnoreCase("--only"))
+	onlyList = args[i + 1].split(",");
+    }
+
+    RunOnce ro = new RunOnce(configFilePath);
+    // only keep the requested types of jobs
+    if (onlyList != null)
+      ro.filter(onlyList);
+    // run once only
+    ro.run();
+  }
+
+}

+ 206 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/SMARTParser.java

@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**********************************************************
+ * Objects of this class parse the output of smartmontools to 
+ * gather information about the state of disks in the system. The
+ * smartmontools utility reads the S.M.A.R.T. attributes from
+ * the disk devices and reports them to the user. Note that since
+ * running smartctl requires superuser provileges, one should  
+ * grand sudo privileges to the running user for the command smartctl
+ * (without a password). Alternatively, one can set up a cron  job that 
+ * periodically dumps the output of smartctl into a user-readable file.
+ * See the configuration file for details.
+ *
+ **********************************************************/
+
+public class SMARTParser extends ShellParser {
+
+  String[] devices;
+
+  /**
+   * Constructs a SMARTParser and reads the list of disk 
+   * devices to query
+   */
+  public SMARTParser() {
+    super();
+    String devicesStr = Environment.getProperty("disks.list");
+    System.out.println("skato " + devicesStr);
+    if (devicesStr != null)
+      devices = devicesStr.split(",\\s*");
+  }
+
+  /**
+   * Reads and parses the output of smartctl for a specified disk and 
+   * creates an appropriate EventRecord that holds the desirable 
+   * information for it. Since the output of smartctl is different for 
+   * different kinds of disks, we try to identify as many attributes as 
+   * posssible for all known output formats. 
+   * 
+   * @param device the disk device name to query
+   * 
+   * @return the EventRecord created
+   */
+  public EventRecord query(String device) throws Exception {
+    String conf = Environment.getProperty("disks." + device + ".source");
+    StringBuffer sb;
+
+    if (conf == null)
+      sb = Environment.runCommand("sudo smartctl --all " + device);
+    else
+      sb = Environment.runCommand("cat " + conf);
+
+    EventRecord retval = new EventRecord(InetAddress.getLocalHost()
+        .getCanonicalHostName(), InetAddress.getAllByName(InetAddress.getLocalHost()
+        .getHostName()), Calendar.getInstance(), "SMART", "Unknown",
+        (conf == null ? "sudo smartctl --all " + device : "file " + conf), "-");
+    // IBM SCSI disks
+    retval.set("model", findPattern("Device\\s*:\\s*(.*)", sb.toString(), 1));
+    retval.set("serial", findPattern("Serial\\s+Number\\s*:\\s*(.*)", sb
+        .toString(), 1));
+    retval.set("firmware", findPattern("Firmware\\s+Version\\s*:\\s*(.*)", sb
+        .toString(), 1));
+    retval.set("capacity", findPattern("User\\s+Capacity\\s*:\\s*(.*)", sb
+        .toString(), 1));
+    retval.set("status", findPattern("SMART\\s*Health\\s*Status:\\s*(.*)", sb
+        .toString(), 1));
+    retval.set("current_temperature", findPattern(
+        "Current\\s+Drive\\s+Temperature\\s*:\\s*(.*)", sb.toString(), 1));
+    retval.set("trip_temperature", findPattern(
+        "Drive\\s+Trip\\s+Temperature\\s*:\\s*(.*)", sb.toString(), 1));
+    retval.set("start_stop_count", findPattern(
+        "start\\s+stop\\s+count\\s*:\\s*(\\d*)", sb.toString(), 1));
+
+    String[] var = { "read", "write", "verify" };
+    for (String s : var) {
+      retval.set(s + "_ecc_fast", findPattern(s + "\\s*:\\s*(\\d*)", sb
+          .toString(), 1));
+      retval.set(s + "_ecc_delayed", findPattern(s
+          + "\\s*:\\s*(\\d+\\s+){1}(\\d+)", sb.toString(), 2));
+      retval.set(s + "_rereads", findPattern(
+          s + "\\s*:\\s*(\\d+\\s+){2}(\\d+)", sb.toString(), 2));
+      retval.set(s + "_GBs", findPattern(s
+          + "\\s*:\\s*(\\d+\\s+){5}(\\d+.?\\d*)", sb.toString(), 2));
+      retval.set(s + "_uncorrected",
+          findPattern(s + "\\s*:\\s*(\\d+\\s+){5}(\\d+.?\\d*){1}\\s+(\\d+)", sb
+              .toString(), 3));
+    }
+
+    // Hitachi IDE, SATA
+    retval.set("model", findPattern("Device\\s*Model\\s*:\\s*(.*)", sb
+        .toString(), 1));
+    retval.set("serial", findPattern("Serial\\s+number\\s*:\\s*(.*)", sb
+        .toString(), 1));
+    retval.set("protocol", findPattern("Transport\\s+protocol\\s*:\\s*(.*)", sb
+        .toString(), 1));
+    retval.set("status", "PASSED".equalsIgnoreCase(findPattern(
+        "test\\s*result\\s*:\\s*(.*)", sb.toString(), 1)) ? "OK" : "FAILED");
+
+    readColumns(retval, sb);
+
+    return retval;
+  }
+
+  /**
+   * Reads attributes in the following format:
+   * 
+   * ID# ATTRIBUTE_NAME          FLAG     VALUE WORST THRESH TYPE      UPDATED  WHEN_FAILED RAW_VALUE
+   * 3 Spin_Up_Time             0x0027   180   177   063    Pre-fail  Always       -       10265
+   * 4 Start_Stop_Count         0x0032   253   253   000    Old_age   Always       -       34
+   * 5 Reallocated_Sector_Ct    0x0033   253   253   063    Pre-fail  Always       -       0
+   * 6 Read_Channel_Margin      0x0001   253   253   100    Pre-fail  Offline      -       0
+   * 7 Seek_Error_Rate          0x000a   253   252   000    Old_age   Always       -       0
+   * 8 Seek_Time_Performance    0x0027   250   224   187    Pre-fail  Always       -       53894
+   * 9 Power_On_Minutes         0x0032   210   210   000    Old_age   Always       -       878h+00m
+   * 10 Spin_Retry_Count        0x002b   253   252   157    Pre-fail  Always       -       0
+   * 11 Calibration_Retry_Count 0x002b   253   252   223    Pre-fail  Always       -       0
+   * 12 Power_Cycle_Count       0x0032   253   253   000    Old_age   Always       -       49
+   * 192 PowerOff_Retract_Count 0x0032   253   253   000    Old_age   Always       -       0
+   * 193 Load_Cycle_Count       0x0032   253   253   000    Old_age   Always       -       0
+   * 194 Temperature_Celsius    0x0032   037   253   000    Old_age   Always       -       37
+   * 195 Hardware_ECC_Recovered 0x000a   253   252   000    Old_age   Always       -       2645
+   * 
+   * This format is mostly found in IDE and SATA disks.
+   * 
+   * @param er the EventRecord in which to store attributes found
+   * @param sb the StringBuffer with the text to parse
+   * 
+   * @return the EventRecord in which new attributes are stored.
+   */
+  private EventRecord readColumns(EventRecord er, StringBuffer sb) {
+
+    Pattern pattern = Pattern.compile("^\\s{0,2}(\\d{1,3}\\s+.*)$",
+        Pattern.MULTILINE);
+    Matcher matcher = pattern.matcher(sb);
+
+    while (matcher.find()) {
+      String[] tokens = matcher.group(1).split("\\s+");
+      boolean failed = false;
+      // check if this attribute is a failed one
+      if (!tokens[8].equals("-"))
+        failed = true;
+      er.set(tokens[1].toLowerCase(), (failed ? "FAILED:" : "") + tokens[9]);
+    }
+
+    return er;
+  }
+
+  /**
+   * Invokes query() to do the parsing and handles parsing errors for 
+   * each one of the disks specified in the configuration. 
+   * 
+   * @return an array of EventRecords that holds one element that represents
+   * the current state of the disk devices.
+   */
+  public EventRecord[] monitor() {
+    ArrayList<EventRecord> recs = new ArrayList<EventRecord>();
+
+    for (String device : devices) {
+      try {
+        recs.add(query(device));
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+
+    EventRecord[] T = new EventRecord[recs.size()];
+
+    return recs.toArray(T);
+  }
+  
+  /**
+   * Return a String with information about this class
+   * 
+   * @return A String describing this class
+   */
+  public String getInfo() {
+    String retval = "S.M.A.R.T. disk attributes parser for disks ";
+    for (String device : devices)
+      retval += device + " ";
+    return retval;
+  }
+
+}

+ 112 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/SensorsParser.java

@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.net.InetAddress;
+import java.util.Calendar;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**********************************************************
+ * Objects of this class parse the output of the lm-sensors utility 
+ * to gather information about fan speed, temperatures for cpus
+ * and motherboard etc.
+ *
+ **********************************************************/
+
+public class SensorsParser extends ShellParser {
+
+  /**
+   * Reads and parses the output of the 'sensors' command 
+   * and creates an appropriate EventRecord that holds 
+   * the desirable information.
+   * 
+   * @param s unused parameter
+   * 
+   * @return the EventRecord created
+   */
+  public EventRecord query(String s) throws Exception {
+    StringBuffer sb;
+
+    //sb = Environment.runCommand("sensors -A");
+     sb = Environment.runCommand("cat sensors.out");
+
+    EventRecord retval = new EventRecord(InetAddress.getLocalHost()
+        .getCanonicalHostName(), InetAddress.getAllByName(InetAddress.getLocalHost()
+        .getHostName()), Calendar.getInstance(), "lm-sensors", "Unknown",
+        "sensors -A", "-");
+    readGroup(retval, sb, "fan");
+    readGroup(retval, sb, "in");
+    readGroup(retval, sb, "temp");
+    readGroup(retval, sb, "Core");
+
+    return retval;
+  }
+
+  /**
+   * Reads and parses lines that provide the output
+   * of a group of sensors with the same functionality.
+   * 
+   * @param er the EventRecord to which the new attributes are added
+   * @param sb the text to parse
+   * @param prefix a String prefix specifying the common prefix of the
+   * sensors' names in the group (e.g. "fan", "in", "temp"
+   * 
+   * @return the EventRecord created
+   */
+  private EventRecord readGroup(EventRecord er, StringBuffer sb, String prefix) {
+
+    Pattern pattern = Pattern.compile(".*(" + prefix
+        + "\\s*\\d*)\\s*:\\s*(\\+?\\d+)", Pattern.MULTILINE);
+    Matcher matcher = pattern.matcher(sb);
+
+    while (matcher.find())
+      er.set(matcher.group(1), matcher.group(2));
+
+    return er;
+  }
+
+  /**
+   * Invokes query() to do the parsing and handles parsing errors. 
+   * 
+   * @return an array of EventRecords that holds one element that represents
+   * the current state of the hardware sensors
+   */
+  public EventRecord[] monitor() {
+    EventRecord[] recs = new EventRecord[1];
+
+    try {
+      recs[0] = query(null);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    return recs;
+  }
+  
+  /**
+   * Return a String with information about this class
+   * 
+   * @return A String describing this class
+   */
+  public String getInfo() {
+    return ("lm-sensors parser");
+  }
+
+}

+ 163 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/SerializedRecord.java

@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.text.DateFormat;
+
+/**********************************************************
+ * Objects of this class hold the serialized representations
+ * of EventRecords. A SerializedRecord is essentially an EventRecord
+ * with all its property values converted to strings. It also provides 
+ * some convenience methods for printing the property fields in a 
+ * more readable way.
+ *
+ **********************************************************/
+
+public class SerializedRecord {
+
+  HashMap<String, String> fields;
+  private static DateFormat dateFormatter =
+    DateFormat.getDateTimeInstance(DateFormat.LONG, DateFormat.LONG);;
+
+  /**
+   * Create the SerializedRecord given an EventRecord.
+   */
+  
+  public SerializedRecord(EventRecord source) {
+    fields = new HashMap<String, String>();
+    fields.clear();
+
+    for (String k : source.getMap().keySet()) {
+      ArrayList<String> strs = getStrings(source.getMap().get(k));
+      if (strs.size() == 1)
+        fields.put(k, strs.get(0));
+      else
+        for (int i = 0; i < strs.size(); i++)
+          fields.put(k + "#" + i, strs.get(i));
+    }
+
+  }
+
+  /**
+   * Extract String representations from an Object.
+   * 
+   * @param o the input object
+   * 
+   * @return an ArrayList that contains Strings found in o
+   */
+  private ArrayList<String> getStrings(Object o) {
+    ArrayList<String> retval = new ArrayList<String>();
+    retval.clear();
+    if (o == null)
+      retval.add("null");
+    else if (o instanceof String)
+      retval.add((String) o);
+    else if (o instanceof Calendar)
+      retval.add(dateFormatter.format(((Calendar) o).getTime()));
+    else if (o instanceof InetAddress[])
+      for (InetAddress ip : ((InetAddress[]) o))
+        retval.add(ip.getHostAddress());
+    else if (o instanceof String[])
+      for (String s : (String []) o)
+        retval.add(s);
+    else
+      retval.add(o.toString());
+
+    return retval;
+  }
+
+  /**
+   * Set the value of a property of the EventRecord.
+   * 
+   * @param fieldName the name of the property to set
+   * @param fieldValue the value of the property to set
+   * 
+   */
+  public void set(String fieldName, String fieldValue) {
+    fields.put(fieldName, fieldValue);
+  }
+
+  /**
+   * Get the value of a property of the EventRecord.
+   * If the property with the specific key is not found,
+   * null is returned.
+   * 
+   * @param fieldName the name of the property to get.
+   */
+  public String get(String fieldName) {
+    return fields.get(fieldName);
+  }
+
+  /**
+   * Arrange the keys to provide a more readable printing order:
+   * first goes the timestamp, then the hostname and then the type, followed
+   * by all other keys found.
+   * 
+   * @param keys The input ArrayList of keys to re-arrange.
+   */
+  public static void arrangeKeys(ArrayList<String> keys) {
+    move(keys, "timestamp", 0);
+    move(keys, "hostname", 1);
+    move(keys, "type", 2);
+  }
+
+  private static void move(ArrayList<String> keys, String key, int position) {
+    int cur = keys.indexOf(key);
+    if (cur == -1)
+      return;
+    keys.set(cur, keys.get(position));
+    keys.set(position, key);
+  }
+
+  /**
+   * Check if the SerializedRecord is a valid one, i.e., whether
+   * it represents meaningful metric values.
+   * 
+   * @return true if the EventRecord is a valid one, false otherwise.
+   */
+  public boolean isValid() {
+    return !("invalid".equalsIgnoreCase(fields.get("hostname")));
+  }
+
+  
+  /**
+   * Creates and returns a string reperssentation of the object
+   * 
+   * @return a String representing the object
+   */
+
+  public String toString() {
+    String retval = "";
+    ArrayList<String> keys = new ArrayList<String>(fields.keySet());
+    arrangeKeys(keys);
+
+    for (int i = 0; i < keys.size(); i++) {
+      String value = fields.get(keys.get(i));
+      if (value == null)
+        retval += keys.get(i) + ":\tnull\n";
+      else
+        retval += keys.get(i) + ":\t" + value + "\n";
+    }
+    return retval;
+  }
+}

+ 102 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/ShellParser.java

@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**********************************************************
+ * Objects of this class parse the output of system command-line
+ * utilities that can give information about the state of  
+ * various hardware components in the system. Typically, each such
+ * object either invokes a command and reads its output or reads the 
+ * output of one such command from a file on the disk. Currently 
+ * supported utilities include ifconfig, smartmontools, lm-sensors,
+ * /proc/cpuinfo.
+ *
+ **********************************************************/
+
+public abstract class ShellParser implements Monitored {
+
+  /**
+   * Find the first occurence ofa pattern in a piece of text 
+   * and return a specific group.
+   * 
+   *  @param strPattern the regular expression to match
+   *  @param text the text to search
+   *  @param grp the number of the matching group to return
+   *  
+   *  @return a String containing the matched group of the regular expression
+   */
+  protected String findPattern(String strPattern, String text, int grp) {
+
+    Pattern pattern = Pattern.compile(strPattern, Pattern.MULTILINE);
+    Matcher matcher = pattern.matcher(text);
+
+    if (matcher.find(0))
+      return matcher.group(grp);
+
+    return null;
+  }
+
+  /**
+   * Finds all occurences of a pattern in a piece of text and returns 
+   * the matching groups.
+   * 
+   *  @param strPattern the regular expression to match
+   *  @param text the text to search
+   *  @param grp the number of the matching group to return
+   *  @param separator the string that separates occurences in the returned value
+   *  
+   *  @return a String that contains all occurences of strPattern in text, 
+   *  separated by separator
+   */
+  protected String findAll(String strPattern, String text, int grp,
+      String separator) {
+
+    String retval = "";
+    boolean firstTime = true;
+
+    Pattern pattern = Pattern.compile(strPattern);
+    Matcher matcher = pattern.matcher(text);
+
+    while (matcher.find()) {
+      retval += (firstTime ? "" : separator) + matcher.group(grp);
+      firstTime = false;
+    }
+
+    return retval;
+  }
+
+  /**
+   * Insert all EventRecords that can be extracted for
+   * the represented hardware component into a LocalStore.
+   * 
+   * @param ls the LocalStore into which the EventRecords 
+   * are to be stored.
+   */
+  public void monitor(LocalStore ls) {
+    ls.insert(monitor());
+  }
+
+  abstract public EventRecord[] monitor();
+
+  abstract public EventRecord query(String s) throws Exception;
+
+}

+ 126 - 0
src/contrib/failmon/src/java/org/apache/hadoop/contrib/failmon/SystemLogParser.java

@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.failmon;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**********************************************************
+ * An object of this class parses a Unix system log file to create
+ * appropriate EventRecords. Currently, only the syslogd logging 
+ * daemon is supported.
+ * 
+ **********************************************************/
+
+public class SystemLogParser extends LogParser {
+
+  static String[] months = { "January", "February", "March", "April", "May",
+      "June", "July", "August", "September", "October", "November", "December" };
+  /**
+   * Create a new parser object .
+   */  
+  public SystemLogParser(String fname) {
+    super(fname);
+    if ((dateformat = Environment.getProperty("log.system.dateformat")) == null)
+      dateformat = "(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\\s+(\\d+)";
+    if ((timeformat = Environment.getProperty("log.system.timeformat")) == null)
+      timeformat = "\\d{2}:\\d{2}:\\d{2}";
+  }
+
+  /**
+   * Parses one line of the log. If the line contains a valid 
+   * log entry, then an appropriate EventRecord is returned, after all
+   * relevant fields have been parsed.
+   *
+   *  @param line the log line to be parsed
+   *    
+   *  @return the EventRecord representing the log entry of the line. If 
+   *  the line does not contain a valid log entry, then the EventRecord 
+   *  returned has isValid() = false. When the end-of-file has been reached,
+   *  null is returned to the caller.
+   */
+  public EventRecord parseLine(String line) throws IOException {
+
+    EventRecord retval = null;
+
+    if (line != null) {
+      // process line
+      String patternStr = "(" + dateformat + ")";
+      patternStr += "\\s+";
+      patternStr += "(" + timeformat + ")";
+      patternStr += "\\s+(\\S*)\\s"; // for hostname
+//      patternStr += "\\s*([\\w+\\.?]+)"; // for source
+      patternStr += ":?\\s*(.+)"; // for the message
+      Pattern pattern = Pattern.compile(patternStr);
+      Matcher matcher = pattern.matcher(line);
+      if (matcher.find() && matcher.groupCount() >= 0) {
+        retval = new EventRecord(hostname, ips, parseDate(matcher.group(1),
+            matcher.group(4)), "SystemLog", "Unknown", // loglevel
+            "Unknown", // source
+            matcher.group(6)); // message
+      } else {
+        retval = new EventRecord();
+      }
+    }
+
+    return retval;
+  }
+
+  /**
+   * Parse a date found in the system log.
+   * 
+   * @return a Calendar representing the date
+   */
+  protected Calendar parseDate(String strDate, String strTime) {
+    Calendar retval = Calendar.getInstance();
+    // set date
+    String[] fields = strDate.split("\\s+");
+    retval.set(Calendar.MONTH, parseMonth(fields[0]));
+    retval.set(Calendar.DATE, Integer.parseInt(fields[1]));
+    // set time
+    fields = strTime.split(":");
+    retval.set(Calendar.HOUR_OF_DAY, Integer.parseInt(fields[0]));
+    retval.set(Calendar.MINUTE, Integer.parseInt(fields[1]));
+    retval.set(Calendar.SECOND, Integer.parseInt(fields[2]));
+    return retval;
+  }
+
+  /**
+   * Convert the name of a month to the corresponding int value.
+   * 
+   * @return the int representation of the month.
+   */
+  private int parseMonth(String month) {
+    for (int i = 0; i < months.length; i++)
+      if (months[i].startsWith(month))
+        return i;
+    return -1;
+  }
+  
+  /**
+   * Return a String with information about this class
+   * 
+   * @return A String describing this class
+   */
+  public String getInfo() {
+    return ("System Log Parser for file : " + file.getAbsoluteFile());
+  }
+}