ソースを参照

commit cba9e3807d2cc5d427d343a643ffd56362765646
Author: Konstantin Boudnik <cos@goodenter-lm.local>
Date: Fri Feb 19 14:35:02 2010 -0800

Merges yahoo-hadoop-0.20.1xx and yahoo-hadoop-0.20-automation.
Incorporates the following patches from yahoo-hadoop-0.20-automation branch

patch from

patch
from

patch from
patch
patch
patch
patch
patch
: patch
: patch


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077176 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 年 前
コミット
a9817ce2ff
37 ファイル変更3813 行追加87 行削除
  1. 141 76
      build.xml
  2. 1 1
      src/saveVersion.sh
  3. 139 10
      src/test/aop/build/aop.xml
  4. 64 0
      src/test/system/aop/org/apache/hadoop/mapred/JTProtocolAspect.aj
  5. 8 0
      src/test/system/aop/org/apache/hadoop/mapred/JobClientAspect.aj
  6. 57 0
      src/test/system/aop/org/apache/hadoop/mapred/JobInProgressAspect.aj
  7. 234 0
      src/test/system/aop/org/apache/hadoop/mapred/JobTrackerAspect.aj
  8. 78 0
      src/test/system/aop/org/apache/hadoop/mapred/TaskTrackerAspect.aj
  9. 143 0
      src/test/system/aop/org/apache/hadoop/test/system/DaemonProtocolAspect.aj
  10. 197 0
      src/test/system/java/org/apache/hadoop/mapred/JobInfoImpl.java
  11. 54 0
      src/test/system/java/org/apache/hadoop/mapred/TTInfoImpl.java
  12. 96 0
      src/test/system/java/org/apache/hadoop/mapred/TTTaskInfoImpl.java
  13. 116 0
      src/test/system/java/org/apache/hadoop/mapred/TaskInfoImpl.java
  14. 101 0
      src/test/system/java/org/apache/hadoop/mapred/TestCluster.java
  15. 181 0
      src/test/system/java/org/apache/hadoop/mapred/TestSortValidate.java
  16. 129 0
      src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java
  17. 299 0
      src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java
  18. 91 0
      src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java
  19. 121 0
      src/test/system/java/org/apache/hadoop/mapreduce/test/system/JobInfo.java
  20. 72 0
      src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java
  21. 28 0
      src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRDaemonClient.java
  22. 12 0
      src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRFault.java
  23. 72 0
      src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java
  24. 24 0
      src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTInfo.java
  25. 32 0
      src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java
  26. 40 0
      src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java
  27. 57 0
      src/test/system/java/org/apache/hadoop/mapreduce/test/system/TaskInfo.java
  28. 121 0
      src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java
  29. 323 0
      src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java
  30. 90 0
      src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java
  31. 59 0
      src/test/system/java/org/apache/hadoop/test/system/ProcessInfo.java
  32. 141 0
      src/test/system/java/org/apache/hadoop/test/system/ProcessInfoImpl.java
  33. 71 0
      src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java
  34. 35 0
      src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManagerFactory.java
  35. 274 0
      src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java
  36. 29 0
      src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java
  37. 83 0
      src/test/testjar/UserNamePermission.java

+ 141 - 76
build.xml

@@ -621,15 +621,26 @@
   <!--                                                                    -->
   <!-- ================================================================== -->
   <target name="examples" depends="jar, compile-examples" description="Make the Hadoop examples jar.">
-    <jar jarfile="${build.dir}/${examples.final.name}.jar"
-         basedir="${build.examples}">
-      <manifest>
-        <attribute name="Main-Class" 
-                   value="org/apache/hadoop/examples/ExampleDriver"/>
-      </manifest>
-    </jar>
+    <macro-jar-examples
+      build.dir="${build.dir}"
+      basedir="${build.examples}">
+    </macro-jar-examples>
   </target>
 
+  <macrodef name="macro-jar-examples">
+    <attribute name="build.dir" />
+    <attribute name="basedir" />
+    <sequential>
+      <jar jarfile="@{build.dir}/${examples.final.name}.jar"
+           basedir="@{basedir}">
+        <manifest>
+          <attribute name="Main-Class"
+                    value="org/apache/hadoop/examples/ExampleDriver"/>
+        </manifest>
+      </jar>
+    </sequential>
+  </macrodef>
+
   <target name="tools-jar" depends="jar, compile-tools" 
           description="Make the Hadoop tools jar.">
     <jar jarfile="${build.dir}/${tools.final.name}.jar"
@@ -791,6 +802,7 @@
     description="Make hadoop-fi.jar">
     <macro-jar-fault-inject
       target.name="jar"
+      build.dir="${build-fi.dir}"
       jar.final.name="final.name"
       jar.final.value="${final.name}-fi" />
   </target>
@@ -840,76 +852,103 @@
   <!-- Run unit tests                                                     --> 
   <!-- ================================================================== -->
   <target name="test-core" depends="jar-test" description="Run core unit tests">
-
-    <delete file="${test.build.dir}/testsfailed"/>
-    <delete dir="${test.build.data}"/>
-    <mkdir dir="${test.build.data}"/>
-    <delete dir="${test.log.dir}"/>
-    <mkdir dir="${test.log.dir}"/>
-  	<copy file="${test.src.dir}/hadoop-policy.xml" 
-  	  todir="${test.build.extraconf}" />
-    <copy file="${test.src.dir}/fi-site.xml"
-      todir="${test.build.extraconf}" />
-    <junit showoutput="${test.output}"
-      printsummary="${test.junit.printsummary}"
-      haltonfailure="${test.junit.haltonfailure}"
-      fork="yes"
-      forkmode="${test.junit.fork.mode}"
-      maxmemory="${test.junit.maxmemory}"
-      dir="${basedir}" timeout="${test.timeout}"
-      errorProperty="tests.failed" failureProperty="tests.failed">
-      <sysproperty key="test.build.data" value="${test.build.data}"/>
-      <sysproperty key="test.tools.input.dir" value="${test.tools.input.dir}"/>
-      <sysproperty key="test.cache.data" value="${test.cache.data}"/>    	
-      <sysproperty key="test.debug.data" value="${test.debug.data}"/>
-      <sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>
-      <sysproperty key="test.src.dir" value="${test.src.dir}"/>
-      <sysproperty key="taskcontroller-path" value="${taskcontroller-path}"/>
-      <sysproperty key="taskcontroller-ugi" value="${taskcontroller-ugi}"/>
-      <sysproperty key="test.build.extraconf" value="${test.build.extraconf}" />
-      <sysproperty key="hadoop.policy.file" value="hadoop-policy.xml"/>
-      <sysproperty key="java.library.path"
-       value="${build.native}/lib:${lib.dir}/native/${build.platform}"/>
-      <sysproperty key="install.c++.examples" value="${install.c++.examples}"/>
-      <!-- set io.compression.codec.lzo.class in the child jvm only if it is set -->
-	  <syspropertyset dynamic="no">
-		  <propertyref name="io.compression.codec.lzo.class"/>
-	  </syspropertyset>
-      <!-- set compile.c++ in the child jvm only if it is set -->
-      <syspropertyset dynamic="no">
-         <propertyref name="compile.c++"/>
-      </syspropertyset>
-      <classpath refid="${test.classpath.id}"/>
-      <syspropertyset id="FaultProbabilityProperties">
-        <propertyref regex="fi.*"/>
-      </syspropertyset>
-      <formatter type="${test.junit.output.format}" />
-      <batchtest todir="${test.build.dir}" if="tests.notestcase">
-        <fileset dir="${test.src.dir}"
-           includes="**/${test.include}.java"
-           excludes="**/${test.exclude}.java aop/**" />
-      </batchtest>
-      <batchtest todir="${test.build.dir}" if="tests.notestcase.fi">
-        <fileset dir="${test.src.dir}/aop"
-          includes="**/${test.include}.java"
-          excludes="**/${test.exclude}.java" />
-      </batchtest>
-      <batchtest todir="${test.build.dir}" if="tests.testcase">
-        <fileset dir="${test.src.dir}"
-          includes="**/${testcase}.java" excludes="aop/**"/>
-      </batchtest>
-      <batchtest todir="${test.build.dir}" if="tests.testcase.fi">
-        <fileset dir="${test.src.dir}/aop" includes="**/${testcase}.java"/>
-      </batchtest>
-      <!--The following batch is for very special occasions only when
-      a non-FI tests are needed to be executed against FI-environment -->
-      <batchtest todir="${test.build.dir}" if="tests.testcaseonly">
-        <fileset dir="${test.src.dir}" includes="**/${testcase}.java"/>
-      </batchtest>
-    </junit>
-    <antcall target="checkfailure"/>
+    <macro-test-runner classpath="${test.classpath.id}"
+                       test.dir="${test.build.dir}"
+                       fileset.dir="${test.src.dir}"
+                       >
+    </macro-test-runner>
   </target>   
 
+  <macrodef name="macro-test-runner">
+    <attribute name="classpath" />
+    <attribute name="test.dir" />
+    <attribute name="fileset.dir" />
+    <attribute name="hadoop.home" default="" />
+    <attribute name="hadoop.conf.dir" default="" />
+    <attribute name="hadoop.conf.dir.deployed" default="" />
+    <sequential>
+      <delete dir="@{test.dir}/data" />
+      <mkdir dir="@{test.dir}/data" />
+      <delete dir="@{test.dir}/logs" />
+      <mkdir dir="@{test.dir}/logs" />
+      <copy file="${test.src.dir}/hadoop-policy.xml"
+            todir="@{test.dir}/extraconf" />
+      <copy file="${test.src.dir}/fi-site.xml"
+            todir="@{test.dir}/extraconf" />
+      <junit showoutput="${test.output}"
+             printsummary="${test.junit.printsummary}"
+             haltonfailure="${test.junit.haltonfailure}"
+             fork="yes"
+             forkmode="${test.junit.fork.mode}"
+             maxmemory="${test.junit.maxmemory}"
+             dir="${basedir}"
+             timeout="${test.timeout}"
+             errorProperty="tests.failed"
+             failureProperty="tests.failed">
+        <sysproperty key="test.build.data" value="${test.build.data}" />
+        <sysproperty key="test.tools.input.dir"
+                     value="${test.tools.input.dir}" />
+        <sysproperty key="test.cache.data" value="${test.cache.data}" />
+        <sysproperty key="test.debug.data" value="${test.debug.data}" />
+        <sysproperty key="hadoop.log.dir" value="${test.log.dir}" />
+        <sysproperty key="test.src.dir" value="${test.src.dir}" />
+        <sysproperty key="taskcontroller-path" value="${taskcontroller-path}" />
+        <sysproperty key="taskcontroller-user" value="${taskcontroller-user}" />
+        <sysproperty key="test.build.extraconf"
+                     value="@{test.dir}/extraconf" />
+        <sysproperty key="hadoop.policy.file" value="hadoop-policy.xml" />
+        <sysproperty key="java.library.path"
+                     value="${build.native}/lib:${lib.dir}/native/${build.platform}" />
+        <sysproperty key="install.c++.examples"
+                     value="${install.c++.examples}" />
+        <sysproperty key="testjar"
+                     value="@{test.dir}/testjar" />
+        <!-- System properties that are specifically set for system tests -->
+        <sysproperty key="test.system.hdrc.hadoophome" value="@{hadoop.home}" />
+        <sysproperty key="test.system.hdrc.hadoopconfdir"
+                     value="@{hadoop.conf.dir}" />
+        <sysproperty key="test.system.hdrc.deployed.hadoopconfdir"
+                     value="@{hadoop.conf.dir.deployed}" />
+        <!-- set io.compression.codec.lzo.class in the child jvm only if it is set -->
+        <syspropertyset dynamic="no">
+          <propertyref name="io.compression.codec.lzo.class" />
+        </syspropertyset>
+        <!-- set compile.c++ in the child jvm only if it is set -->
+        <syspropertyset dynamic="no">
+          <propertyref name="compile.c++" />
+        </syspropertyset>
+        <classpath refid="@{classpath}" />
+        <syspropertyset id="FaultProbabilityProperties">
+          <propertyref regex="fi.*" />
+        </syspropertyset>
+        <formatter type="${test.junit.output.format}" />
+        <batchtest todir="@{test.dir}" if="tests.notestcase">
+          <fileset dir="@{fileset.dir}"
+                   includes="**/${test.include}.java"
+                   excludes="**/${test.exclude}.java aop/** system/**" />
+        </batchtest>
+        <batchtest todir="${test.build.dir}" if="tests.notestcase.fi">
+          <fileset dir="${test.src.dir}/aop"
+                   includes="**/${test.include}.java"
+                   excludes="**/${test.exclude}.java" />
+        </batchtest>
+        <batchtest todir="@{test.dir}" if="tests.testcase">
+          <fileset dir="@{fileset.dir}"
+            includes="**/${testcase}.java" excludes="aop/** system/**"/>
+        </batchtest>
+        <batchtest todir="${test.build.dir}" if="tests.testcase.fi">
+          <fileset dir="${test.src.dir}/aop" includes="**/${testcase}.java" />
+        </batchtest>
+        <!--The following batch is for very special occasions only when
+                a non-FI tests are needed to be executed against FI-environment -->
+        <batchtest todir="${test.build.dir}" if="tests.testcaseonly">
+          <fileset dir="${test.src.dir}" includes="**/${testcase}.java" />
+        </batchtest>
+      </junit>
+      <antcall target="checkfailure"/>
+    </sequential>
+  </macrodef>
+
   <target name="checkfailure" if="tests.failed">
     <touch file="${test.build.dir}/testsfailed"/>
     <fail unless="continueOnFailure">Tests failed!</fail>
@@ -1400,6 +1439,32 @@
     </chmod>
   </target>
 
+  <target name="binary-system" depends="bin-package, jar-system, jar-test-system"
+     description="make system test package for deployment">
+    <copy todir="${system-test-build-dir}/${final.name}">
+      <fileset dir="${dist.dir}">
+      </fileset>
+    </copy>
+    <copy todir="${system-test-build-dir}/${final.name}" 
+      file="${system-test-build-dir}/${core.final.name}.jar" overwrite="true"/>
+    <copy todir="${system-test-build-dir}/${final.name}"
+      file="${system-test-build-dir}/${test.final.name}.jar" overwrite="true"/>
+    <macro_tar 
+      param.destfile="${system-test-build-dir}/${final.name}-bin.tar.gz">
+        <param.listofitems>
+          <tarfileset dir="${system-test-build-dir}" mode="664">
+            <exclude name="${final.name}/bin/*" />
+            <exclude name="${final.name}/src/**" />
+            <exclude name="${final.name}/docs/**" />
+            <include name="${final.name}/**" />
+          </tarfileset>
+          <tarfileset dir="${build.dir}" mode="755">
+            <include name="${final.name}/bin/*" />
+          </tarfileset>
+        </param.listofitems>
+      </macro_tar>
+  </target>
+  
   <target name="binary" depends="bin-package" description="Make tarball without source and documentation">
     <macro_tar param.destfile="${build.dir}/${final.name}-bin.tar.gz">
       <param.listofitems>
@@ -1430,7 +1495,7 @@
   <!-- ================================================================== -->
   <!-- Clean.  Delete the build files, and their directories              -->
   <!-- ================================================================== -->
-  <target name="clean" depends="clean-contrib, clean-sign " description="Clean.  Delete the build files, and their directories">
+  <target name="clean" depends="clean-contrib, clean-sign, clean-fi" description="Clean.  Delete the build files, and their directories">
     <delete dir="${build.dir}"/>
     <delete dir="${docs.src}/build"/>
     <delete dir="${src.docs.cn}/build"/>

+ 1 - 1
src/saveVersion.sh

@@ -37,7 +37,7 @@ mkdir -p $build_dir/src/org/apache/hadoop
 cat << EOF | \
   sed -e "s/VERSION/$version/" -e "s/USER/$user/" -e "s/DATE/$date/" \
       -e "s|URL|$url|" -e "s/REV/$revision/" \
-      > build/src/org/apache/hadoop/package-info.java
+      > $build_dir/src/org/apache/hadoop/package-info.java
 /*
  * Generated by src/saveVersion.sh
  */

+ 139 - 10
src/test/aop/build/aop.xml

@@ -15,12 +15,18 @@
    limitations under the License.
 -->
 <project name="aspects">
+  <!-- Properties common for all fault injections -->
   <property name="build-fi.dir" value="${basedir}/build-fi"/>
   <property name="hadoop-fi.jar" location="${build.dir}/${final.name}-fi.jar" />
   <property name="compile-inject.output" value="${build-fi.dir}/compile-fi.log"/>
   <property name="aspectversion" value="1.6.5"/>
   <property file="${basedir}/build.properties"/>
 
+  <!-- Properties related to system fault injection and tests -->
+  <property name="system-test-build-dir" value="${build-fi.dir}/system"/>
+
+  <!-- Properties specifically for system fault-injections and system tests -->
+ 
   <!--All Fault Injection (FI) related targets are located in this session -->
     
   <target name="clean-fi">
@@ -44,10 +50,11 @@
     <echo message="Start weaving aspects in place"/>
     <iajc
       encoding="${build.encoding}" 
-      srcdir="${core.src.dir};${mapred.src.dir};${hdfs.src.dir};${build.src};${test.src.dir}/aop" 
-      includes="org/apache/hadoop/**/*.java, org/apache/hadoop/**/*.aj"
+      srcdir="${core.src.dir};${mapred.src.dir};${hdfs.src.dir};${build.src};
+              ${src.dir.path}"
+      includes="**/org/apache/hadoop/**/*.java, **/org/apache/hadoop/**/*.aj"
       excludes="org/apache/hadoop/record/**/*"
-      destDir="${build.classes}"
+      destDir="${dest.dir}"
       debug="${javac.debug}"
       target="${javac.version}"
       source="${javac.version}"
@@ -55,7 +62,15 @@
       fork="true"
       maxmem="256m"
       >
-      <classpath refid="test.classpath"/>
+
+      <classpath>
+       <path refid="test.classpath"/>
+       <fileset dir="${build-fi.dir}/test/testjar">
+          <include name="**/*.jar" />
+          <exclude name="**/excluded/" />
+       </fileset>
+     </classpath>
+
     </iajc>
     <loadfile property="injection.failure" srcfile="${compile-inject.output}">
      <filterchain>
@@ -70,16 +85,129 @@
     <echo message="Weaving of aspects is finished"/>
   </target>
 
-  <target name="injectfaults" 
-  	description="Instrument classes with faults and other AOP advices">
+  <!-- Classpath for running system tests -->
+  <path id="test.system.classpath">
+        <pathelement location="${hadoop.conf.dir.deployed}" />
+        <pathelement location="${hadoop.conf.dir}" />
+        <pathelement location="${system-test-build-dir}/test/extraconf" />
+        <pathelement location="${system-test-build-dir}/test/classes" />
+        <pathelement location="${system-test-build-dir}/classes" />
+        <pathelement location="${test.src.dir}" />
+        <pathelement location="${build-fi.dir}" />
+        <pathelement location="${build-fi.dir}/tools" />
+        <pathelement path="${clover.jar}" />
+        <fileset dir="${test.lib.dir}">
+          <include name="**/*.jar" />
+          <exclude name="**/excluded/" />
+        </fileset>
+        <fileset dir="${system-test-build-dir}">
+           <include name="**/*.jar" />
+           <exclude name="**/excluded/" />
+         </fileset>
+         <fileset dir="${build-fi.dir}/test/testjar">
+           <include name="**/*.jar" />
+           <exclude name="**/excluded/" />
+         </fileset>
+        <path refid="classpath" />
+  </path>
+
+  <!-- ================ -->
+  <!-- run system tests -->
+  <!-- ================ -->
+  <target name="test-system" depends="-test-system-deployed, -test-system-local"
+    description="Run system tests">
+  </target>
+
+  <target name="-test-system-local"
+    depends="ivy-retrieve-common, prepare-test-system" 
+    unless="hadoop.conf.dir.deployed">
+    <macro-jar-examples
+      build.dir="${system-test-build-dir}"
+      basedir="${system-test-build-dir}/examples">
+    </macro-jar-examples>
+    <macro-test-runner classpath="test.system.classpath"
+                       test.dir="${system-test-build-dir}/test"
+                       fileset.dir="${test.src.dir}/system/java"
+                       hadoop.home="${hadoop.home}"
+                       hadoop.conf.dir="${hadoop.conf.dir}">
+    </macro-test-runner>
+  </target>
+  <target name="-test-system-deployed"
+    depends="ivy-retrieve-common, prepare-test-system" 
+    if="hadoop.conf.dir.deployed">
+    <macro-jar-examples
+      build.dir="${system-test-build-dir}"
+      basedir="${system-test-build-dir}/examples">
+    </macro-jar-examples>
+    <macro-test-runner classpath="test.system.classpath"
+                       test.dir="${system-test-build-dir}/test"
+                       fileset.dir="${test.src.dir}/system/java"
+                       hadoop.home="${hadoop.home}"
+                       hadoop.conf.dir="${hadoop.conf.dir}"
+                       hadoop.conf.dir.deployed="${hadoop.conf.dir.deployed}">
+    </macro-test-runner>
+  </target>
+
+  <target name="prepare-test-system" depends="jar-test-system">
+    <subant buildpath="build.xml" target="inject-system-faults">
+      <property name="build.dir" value="${system-test-build-dir}" />
+    </subant>
+  </target>
+
+  <target name="injectfaults"
+          description="Instrument classes with faults and other AOP advices">
     <mkdir dir="${build-fi.dir}"/>
     <delete file="${compile-inject.output}"/>
-    <subant buildpath="${basedir}" target="compile-fault-inject"
-      output="${compile-inject.output}">
-      <property name="build.dir" value="${build-fi.dir}"/>
+    <weave-injectfault-aspects dest.dir="${build-fi.dir}/classes}"
+                               src.dir="${test.src.dir}/aop">
+    </weave-injectfault-aspects>
+  </target>
+
+  <!-- =============================================================== -->
+  <!-- Create hadoop-{version}-dev-core.jar required to be deployed on -->
+  <!-- cluster for system tests                                        -->
+  <!-- =============================================================== -->
+  <target name="jar-system"
+          depends="inject-system-faults"
+          description="make hadoop.jar">
+    <macro-jar-fault-inject target.name="jar"
+      build.dir="${system-test-build-dir}"
+      jar.final.name="final.name"
+      jar.final.value="${final.name}">
+    </macro-jar-fault-inject>
+  </target>
+
+  <target name="jar-test-system" depends="inject-system-faults"
+    description="Make hadoop-test.jar with system fault-injection">
+    <subant buildpath="build.xml" target="jar-test">
+      <property name="build.dir" value="${system-test-build-dir}"/>
+      <property name="test.build.classes"
+        value="${system-test-build-dir}/test/classes"/>
     </subant>
   </target>
 
+  <macrodef name="weave-injectfault-aspects">
+    <attribute name="dest.dir" />
+    <attribute name="src.dir" />
+    <sequential>
+      <subant buildpath="build.xml" target="compile-fault-inject"
+        output="${compile-inject.output}">
+        <property name="build.dir" value="${build-fi.dir}" />
+        <property name="src.dir.path" value="@{src.dir}" />
+        <property name="dest.dir" value="@{dest.dir}" />
+      </subant>
+    </sequential>
+  </macrodef>
+
+  <target name="inject-system-faults" description="Inject system faults">
+    <property name="build-fi.dir" value="${system-test-build-dir}" />
+    <mkdir dir="${build-fi.dir}"/>
+    <delete file="${compile-inject.output}"/>
+    <weave-injectfault-aspects dest.dir="${system-test-build-dir}/classes"
+                               src.dir="${test.src.dir}/system">
+    </weave-injectfault-aspects>
+    </target>
+
   <macrodef name="macro-run-tests-fault-inject">
     <attribute name="target.name" />
     <attribute name="testcasesonly" />
@@ -99,11 +227,12 @@
   <!-- ================================================================== -->
   <macrodef name="macro-jar-fault-inject">
     <attribute name="target.name" />
+    <attribute name="build.dir" />
     <attribute name="jar.final.name" />
     <attribute name="jar.final.value" />
     <sequential>
       <subant buildpath="build.xml" target="@{target.name}">
-        <property name="build.dir" value="${build-fi.dir}"/>
+        <property name="build.dir" value="@{build.dir}"/>
         <property name="@{jar.final.name}" value="@{jar.final.value}"/>
         <property name="jar.extra.properties.list" 
         	  value="${test.src.dir}/fi-site.xml" />

+ 64 - 0
src/test/system/aop/org/apache/hadoop/mapred/JTProtocolAspect.aj

@@ -0,0 +1,64 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.TTInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+
+/**
+ * Aspect which injects the basic protocol functionality which is to be
+ * implemented by all the services which implement {@link ClientProtocol}
+ * 
+ * Aspect also injects default implementation for the {@link JTProtocol}
+ */
+
+public aspect JTProtocolAspect {
+
+  // Make the ClientProtocl extend the JTprotocol
+  declare parents : JobSubmissionProtocol extends JTProtocol;
+
+  /*
+   * Start of default implementation of the methods in JTProtocol
+   */
+
+  public Configuration JTProtocol.getDaemonConf() throws IOException {
+    return null;
+  }
+
+  public JobInfo JTProtocol.getJobInfo(JobID jobID) throws IOException {
+    return null;
+  }
+
+  public TaskInfo JTProtocol.getTaskInfo(TaskID taskID) throws IOException {
+    return null;
+  }
+
+  public TTInfo JTProtocol.getTTInfo(String trackerName) throws IOException {
+    return null;
+  }
+
+  public JobInfo[] JTProtocol.getAllJobInfo() throws IOException {
+    return null;
+  }
+
+  public TaskInfo[] JTProtocol.getTaskInfo(JobID jobID) throws IOException {
+    return null;
+  }
+
+  public TTInfo[] JTProtocol.getAllTTInfo() throws IOException {
+    return null;
+  }
+  
+  public boolean JTProtocol.isJobRetired(JobID jobID) throws IOException {
+    return false;
+  }
+  
+  public String JTProtocol.getJobHistoryLocationForRetiredJob(JobID jobID) throws IOException {
+    return "";
+  }
+}

+ 8 - 0
src/test/system/aop/org/apache/hadoop/mapred/JobClientAspect.aj

@@ -0,0 +1,8 @@
+package org.apache.hadoop.mapred;
+
+public privileged aspect JobClientAspect {
+
+  public JobSubmissionProtocol JobClient.getProtocol() {
+    return jobSubmitClient;
+  }
+}

+ 57 - 0
src/test/system/aop/org/apache/hadoop/mapred/JobInProgressAspect.aj

@@ -0,0 +1,57 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+
+/**
+ * Aspect to add a utility method in the JobInProgress for easing up the
+ * construction of the JobInfo object.
+ */
+privileged aspect JobInProgressAspect {
+
+  /**
+   * Returns a read only view of the JobInProgress object which is used by the
+   * client.
+   * 
+   * @return JobInfo of the current JobInProgress object
+   */
+  public JobInfo JobInProgress.getJobInfo() {
+    String historyLoc = getHistoryPath();
+    if (tasksInited.get()) {
+      return new JobInfoImpl(
+          this.getJobID(), this.isSetupLaunched(), this.isSetupFinished(), this
+              .isCleanupLaunched(), this.runningMaps(), this.runningReduces(),
+          this.pendingMaps(), this.pendingReduces(), this.finishedMaps(), this
+              .finishedReduces(), this.getStatus(), historyLoc, this
+              .getBlackListedTrackers(), false, this.numMapTasks,
+          this.numReduceTasks, this.isHistoryFileCopied());
+    } else {
+      return new JobInfoImpl(
+          this.getJobID(), false, false, false, 0, 0, this.pendingMaps(), this
+              .pendingReduces(), this.finishedMaps(), this.finishedReduces(),
+          this.getStatus(), historyLoc, this.getBlackListedTrackers(), this
+              .isComplete(), this.numMapTasks, this.numReduceTasks, 
+              this.isHistoryFileCopied());
+    }
+  }
+  
+  private String JobInProgress.getHistoryPath() {
+    String historyLoc = "";
+    if(this.isComplete()) {
+      historyLoc = this.getHistoryFile();
+    } else {
+      String historyFileName = null;
+      try {
+        historyFileName  = JobHistory.JobInfo.getJobHistoryFileName(conf, 
+            jobId);
+      } catch(IOException e) {
+      }
+      if(historyFileName != null) {
+        historyLoc = JobHistory.JobInfo.getJobHistoryLogLocation(
+            historyFileName).toString();
+      }
+    }
+    return historyLoc;
+  }
+
+}

+ 234 - 0
src/test/system/aop/org/apache/hadoop/mapred/JobTrackerAspect.aj

@@ -0,0 +1,234 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobTracker.RetireJobInfo;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.TTInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.test.system.DaemonProtocol;
+
+/**
+ * Aspect class which injects the code for {@link JobTracker} class.
+ * 
+ */
+public privileged aspect JobTrackerAspect {
+
+
+  public Configuration JobTracker.getDaemonConf() throws IOException {
+    return conf;
+  }
+  /**
+   * Method to get the read only view of the job and its associated information.
+   * 
+   * @param jobID
+   *          id of the job for which information is required.
+   * @return JobInfo of the job requested
+   * @throws IOException
+   */
+  public JobInfo JobTracker.getJobInfo(JobID jobID) throws IOException {
+    JobInProgress jip = jobs.get(org.apache.hadoop.mapred.JobID
+        .downgrade(jobID));
+    if (jip == null) {
+      LOG.warn("No job present for : " + jobID);
+      return null;
+    }
+    JobInfo info;
+    synchronized (jip) {
+      info = jip.getJobInfo();
+    }
+    return info;
+  }
+
+  /**
+   * Method to get the read only view of the task and its associated
+   * information.
+   * 
+   * @param taskID
+   * @return
+   * @throws IOException
+   */
+  public TaskInfo JobTracker.getTaskInfo(TaskID taskID) throws IOException {
+    TaskInProgress tip = getTip(org.apache.hadoop.mapred.TaskID
+        .downgrade(taskID));
+
+    if (tip == null) {
+      LOG.warn("No task present for : " + taskID);
+      return null;
+    }
+    TaskInfo info;
+    TaskStatus[] status = tip.getTaskStatuses();
+    synchronized (tip) {
+      if (status == null) {
+        if (tip.isMapTask()) {
+          status = new MapTaskStatus[]{};
+        }
+        else {
+          status = new ReduceTaskStatus[]{};
+        }
+      }
+      info = new TaskInfoImpl(tip.getTIPId(), tip.getProgress(), tip
+          .getActiveTasks().size(), tip.numKilledTasks(), 
+          tip.numTaskFailures(), status);
+    }
+    return info;
+  }
+
+  public TTInfo JobTracker.getTTInfo(String trackerName) throws IOException {
+    org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker tt = taskTrackers
+        .get(trackerName);
+    if (tt == null) {
+      LOG.warn("No task tracker with name : " + trackerName + " found");
+      return null;
+    }
+    TaskTrackerStatus status = tt.getStatus();
+    TTInfo info = new TTInfoImpl(status.trackerName, status);
+    return info;
+  }
+
+  // XXX Below two method don't reuse getJobInfo and getTaskInfo as there is a
+  // possibility that retire job can run and remove the job from JT memory
+  // during
+  // processing of the RPC call.
+  public JobInfo[] JobTracker.getAllJobInfo() throws IOException {
+    List<JobInfo> infoList = new ArrayList<JobInfo>();
+    synchronized (jobs) {
+      for (JobInProgress jip : jobs.values()) {
+        JobInfo info = jip.getJobInfo();
+        infoList.add(info);
+      }
+    }
+    return (JobInfo[]) infoList.toArray(new JobInfo[infoList.size()]);
+  }
+
+  public TaskInfo[] JobTracker.getTaskInfo(JobID jobID) throws IOException {
+    JobInProgress jip = jobs.get(org.apache.hadoop.mapred.JobID
+        .downgrade(jobID));
+    if (jip == null) {
+      LOG.warn("Unable to find job : " + jobID);
+      return null;
+    }
+    List<TaskInfo> infoList = new ArrayList<TaskInfo>();
+    TaskStatus[] status;
+    synchronized (jip) {
+      for (TaskInProgress tip : jip.setup) {
+        status = tip.getTaskStatuses();
+        if (status == null) {
+          if (tip.isMapTask()) {
+            status = new MapTaskStatus[]{};
+          }
+          else {
+            status = new ReduceTaskStatus[]{};
+          }
+        }
+        TaskInfo info = new TaskInfoImpl(tip.getTIPId(), tip.getProgress(), tip
+            .getActiveTasks().size(), tip.numKilledTasks(), tip
+            .numTaskFailures(), status);
+        infoList.add(info);
+      }
+      for (TaskInProgress tip : jip.maps) {
+        status = tip.getTaskStatuses();
+        if (status == null) {
+          status = new MapTaskStatus[]{};
+        }
+        TaskInfo info = new TaskInfoImpl(tip.getTIPId(), tip.getProgress(), tip
+            .getActiveTasks().size(), tip.numKilledTasks(), tip
+            .numTaskFailures(), status);
+        infoList.add(info);
+      }
+      for (TaskInProgress tip : jip.reduces) {
+        status = tip.getTaskStatuses();
+        if (status == null) {
+          status = new ReduceTaskStatus[]{};
+        }
+        TaskInfo info = new TaskInfoImpl(tip.getTIPId(), tip.getProgress(), tip
+            .getActiveTasks().size(), tip.numKilledTasks(), tip
+            .numTaskFailures(), status);
+        infoList.add(info);
+      }
+      for (TaskInProgress tip : jip.cleanup) {
+        status = tip.getTaskStatuses();
+        if (status == null) {
+          if (tip.isMapTask()) {
+            status = new MapTaskStatus[]{};
+          }
+          else {
+            status = new ReduceTaskStatus[]{};
+          }
+        }
+        TaskInfo info = new TaskInfoImpl(tip.getTIPId(), tip.getProgress(), tip
+            .getActiveTasks().size(), tip.numKilledTasks(), tip
+            .numTaskFailures(), status);
+        infoList.add(info);
+      }
+    }
+    return (TaskInfo[]) infoList.toArray(new TaskInfo[infoList.size()]);
+  }
+
+  public TTInfo[] JobTracker.getAllTTInfo() throws IOException {
+    List<TTInfo> infoList = new ArrayList<TTInfo>();
+    synchronized (taskTrackers) {
+      for (TaskTracker tt : taskTrackers.values()) {
+        TaskTrackerStatus status = tt.getStatus();
+        TTInfo info = new TTInfoImpl(status.trackerName, status);
+        infoList.add(info);
+      }
+    }
+    return (TTInfo[]) infoList.toArray(new TTInfo[infoList.size()]);
+  }
+  
+  public boolean JobTracker.isJobRetired(JobID id) throws IOException {
+    return retireJobs.get(
+        org.apache.hadoop.mapred.JobID.downgrade(id))!=null?true:false;
+  }
+
+  public String JobTracker.getJobHistoryLocationForRetiredJob(
+      JobID id) throws IOException {
+    RetireJobInfo retInfo = retireJobs.get(
+        org.apache.hadoop.mapred.JobID.downgrade(id));
+    if(retInfo == null) {
+      throw new IOException("The retired job information for the job : " 
+          + id +" is not found");
+    } else {
+      return retInfo.getHistoryFile();
+    }
+  }
+  pointcut getVersionAspect(String protocol, long clientVersion) : 
+    execution(public long JobTracker.getProtocolVersion(String , 
+      long) throws IOException) && args(protocol, clientVersion);
+
+  long around(String protocol, long clientVersion) :  
+    getVersionAspect(protocol, clientVersion) {
+    if (protocol.equals(DaemonProtocol.class.getName())) {
+      return DaemonProtocol.versionID;
+    } else if (protocol.equals(JTProtocol.class.getName())) {
+      return JTProtocol.versionID;
+    } else {
+      return proceed(protocol, clientVersion);
+    }
+  }
+
+  /**
+   * Point cut which monitors for the start of the jobtracker and sets the right
+   * value if the jobtracker is started.
+   * 
+   * @param conf
+   * @param jobtrackerIndentifier
+   */
+  pointcut jtConstructorPointCut(JobConf conf, String jobtrackerIndentifier) : 
+        call(JobTracker.new(JobConf,String)) 
+        && args(conf, jobtrackerIndentifier) ;
+
+  after(JobConf conf, String jobtrackerIndentifier) 
+    returning (JobTracker tracker): jtConstructorPointCut(conf, 
+        jobtrackerIndentifier) {
+    tracker.setReady(true);
+  }
+}

+ 78 - 0
src/test/system/aop/org/apache/hadoop/mapred/TaskTrackerAspect.aj

@@ -0,0 +1,78 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.TTProtocol;
+import org.apache.hadoop.mapreduce.test.system.TTTaskInfo;
+import org.apache.hadoop.mapred.TTTaskInfoImpl.MapTTTaskInfo;
+import org.apache.hadoop.mapred.TTTaskInfoImpl.ReduceTTTaskInfo;
+import org.apache.hadoop.test.system.DaemonProtocol;
+import org.apache.hadoop.test.system.DaemonProtocolAspect;
+
+public privileged aspect TaskTrackerAspect {
+
+  declare parents : TaskTracker implements TTProtocol;
+
+  // Add a last sent status field to the Tasktracker class.
+  TaskTrackerStatus TaskTracker.lastSentStatus = null;
+
+  public synchronized TaskTrackerStatus TaskTracker.getStatus()
+      throws IOException {
+    return lastSentStatus;
+  }
+
+  public Configuration TaskTracker.getDaemonConf() throws IOException {
+    return fConf;
+  }
+
+  public TTTaskInfo[] TaskTracker.getTasks() throws IOException {
+    List<TTTaskInfo> infoList = new ArrayList<TTTaskInfo>();
+    for (TaskInProgress tip : tasks.values()) {
+      TTTaskInfo info = null;
+      if (tip.task.isMapTask()) {
+        info = new MapTTTaskInfo(((MapTask) tip.task), tip.slotTaken,
+            tip.wasKilled, tip.diagnosticInfo.toString());
+      } else {
+        info = new ReduceTTTaskInfo(((ReduceTask) tip.task), tip.slotTaken,
+            tip.wasKilled, tip.diagnosticInfo.toString());
+      }
+      infoList.add(info);
+    }
+    return (TTTaskInfo[]) infoList.toArray(new TTTaskInfo[infoList.size()]);
+  }
+
+  before(TaskTrackerStatus newStatus, TaskTracker tracker) : 
+    set(TaskTrackerStatus TaskTracker.status) 
+    && args(newStatus) && this(tracker) {
+    if (newStatus == null) {
+      tracker.lastSentStatus = tracker.status;
+    }
+  }
+
+  pointcut ttConstructorPointCut(JobConf conf) : 
+    call(TaskTracker.new(JobConf)) 
+    && args(conf);
+
+  after(JobConf conf) returning (TaskTracker tracker): 
+    ttConstructorPointCut(conf) {
+    tracker.setReady(true);
+  }
+  
+  pointcut getVersionAspect(String protocol, long clientVersion) : 
+    execution(public long TaskTracker.getProtocolVersion(String , 
+      long) throws IOException) && args(protocol, clientVersion);
+
+  long around(String protocol, long clientVersion) :  
+    getVersionAspect(protocol, clientVersion) {
+    if(protocol.equals(DaemonProtocol.class.getName())) {
+      return DaemonProtocol.versionID;
+    } else if(protocol.equals(TTProtocol.class.getName())) {
+      return TTProtocol.versionID;
+    } else {
+      return proceed(protocol, clientVersion);
+    }
+  }
+}

+ 143 - 0
src/test/system/aop/org/apache/hadoop/test/system/DaemonProtocolAspect.aj

@@ -0,0 +1,143 @@
+package org.apache.hadoop.test.system;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Default DaemonProtocolAspect which is used to provide default implementation
+ * for all the common daemon methods. If a daemon requires more specialized
+ * version of method, it is responsibility of the DaemonClient to introduce the
+ * same in woven classes.
+ * 
+ */
+public aspect DaemonProtocolAspect {
+
+  private boolean DaemonProtocol.ready;
+
+  /**
+   * Set if the daemon process is ready or not, concrete daemon protocol should
+   * implement pointcuts to determine when the daemon is ready and use the
+   * setter to set the ready state.
+   * 
+   * @param ready
+   *          true if the Daemon is ready.
+   */
+  public void DaemonProtocol.setReady(boolean ready) {
+    this.ready = ready;
+  }
+
+  /**
+   * Checks if the daemon process is alive or not.
+   * 
+   * @throws IOException
+   *           if daemon is not alive.
+   */
+  public void DaemonProtocol.ping() throws IOException {
+  }
+
+  /**
+   * Checks if the daemon process is ready to accepting RPC connections after it
+   * finishes initialization. <br/>
+   * 
+   * @return true if ready to accept connection.
+   * 
+   * @throws IOException
+   */
+  public boolean DaemonProtocol.isReady() throws IOException {
+    return ready;
+  }
+
+  /**
+   * Returns the process related information regarding the daemon process. <br/>
+   * 
+   * @return process information.
+   * @throws IOException
+   */
+  public ProcessInfo DaemonProtocol.getProcessInfo() throws IOException {
+    int activeThreadCount = Thread.activeCount();
+    long currentTime = System.currentTimeMillis();
+    long maxmem = Runtime.getRuntime().maxMemory();
+    long freemem = Runtime.getRuntime().freeMemory();
+    long totalmem = Runtime.getRuntime().totalMemory();
+    Map<String, String> envMap = System.getenv();
+    Properties sysProps = System.getProperties();
+    Map<String, String> props = new HashMap<String, String>();
+    for (Map.Entry entry : sysProps.entrySet()) {
+      props.put((String) entry.getKey(), (String) entry.getValue());
+    }
+    ProcessInfo info = new ProcessInfoImpl(activeThreadCount, currentTime,
+        freemem, maxmem, totalmem, envMap, props);
+    return info;
+  }
+
+  public void DaemonProtocol.enable(List<Enum<?>> faults) throws IOException {
+  }
+
+  public void DaemonProtocol.disableAll() throws IOException {
+  }
+
+  public abstract Configuration DaemonProtocol.getDaemonConf()
+    throws IOException;
+
+  public FileStatus DaemonProtocol.getFileStatus(String path, boolean local) 
+    throws IOException {
+    Path p = new Path(path);
+    FileSystem fs = getFS(p, local);
+    p.makeQualified(fs);
+    FileStatus fileStatus = fs.getFileStatus(p);
+    return cloneFileStatus(fileStatus);
+  }
+
+  public FileStatus[] DaemonProtocol.listStatus(String path, boolean local) 
+    throws IOException {
+    Path p = new Path(path);
+    FileSystem fs = getFS(p, local);
+    FileStatus[] status = fs.listStatus(p);
+    if (status != null) {
+      FileStatus[] result = new FileStatus[status.length];
+      int i = 0;
+      for (FileStatus fileStatus : status) {
+        result[i++] = cloneFileStatus(fileStatus);
+      }
+      return result;
+    }
+    return status;
+  }
+
+  /**
+   * FileStatus object may not be serializable. Clone it into raw FileStatus 
+   * object.
+   */
+  private FileStatus DaemonProtocol.cloneFileStatus(FileStatus fileStatus) {
+    return new FileStatus(fileStatus.getLen(),
+        fileStatus.isDir(),
+        fileStatus.getReplication(),
+        fileStatus.getBlockSize(),
+        fileStatus.getModificationTime(),
+        fileStatus.getAccessTime(),
+        fileStatus.getPermission(),
+        fileStatus.getOwner(),
+        fileStatus.getGroup(),
+        fileStatus.getPath());
+  }
+
+  private FileSystem DaemonProtocol.getFS(Path path, boolean local) 
+    throws IOException {
+    FileSystem fs = null;
+    if (local) {
+      fs = FileSystem.getLocal(getDaemonConf());
+    } else {
+      fs = path.getFileSystem(getDaemonConf());
+    }
+    return fs;
+  }
+}

+ 197 - 0
src/test/system/java/org/apache/hadoop/mapred/JobInfoImpl.java

@@ -0,0 +1,197 @@
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+
+/**
+ * Concrete implementation of the JobInfo interface which is exposed to the
+ * clients.
+ * Look at {@link JobInfo} for further details.
+ */
+class JobInfoImpl implements JobInfo {
+
+  private List<String> blackListedTracker;
+  private String historyUrl;
+  private JobID id;
+  private boolean setupLaunched;
+  private boolean setupFinished;
+  private boolean cleanupLaunched;
+  private JobStatus status;
+  private int runningMaps;
+  private int runningReduces;
+  private int waitingMaps;
+  private int waitingReduces;
+  private int finishedMaps;
+  private int finishedReduces;
+  private int numMaps;
+  private int numReduces;
+  private boolean historyCopied;
+
+  public JobInfoImpl() {
+    id = new JobID();
+    status = new JobStatus();
+    blackListedTracker = new LinkedList<String>();
+    historyUrl = "";
+  }
+  
+  public JobInfoImpl(
+      JobID id, boolean setupLaunched, boolean setupFinished,
+      boolean cleanupLaunched, int runningMaps, int runningReduces,
+      int waitingMaps, int waitingReduces, int finishedMaps,
+      int finishedReduces, JobStatus status, String historyUrl,
+      List<String> blackListedTracker, boolean isComplete, int numMaps,
+      int numReduces, boolean historyCopied) {
+    super();
+    this.blackListedTracker = blackListedTracker;
+    this.historyUrl = historyUrl;
+    this.id = id;
+    this.setupLaunched = setupLaunched;
+    this.setupFinished = setupFinished;
+    this.cleanupLaunched = cleanupLaunched;
+    this.status = status;
+    this.runningMaps = runningMaps;
+    this.runningReduces = runningReduces;
+    this.waitingMaps = waitingMaps;
+    this.waitingReduces = waitingReduces;
+    this.finishedMaps = finishedMaps;
+    this.finishedReduces = finishedReduces;
+    this.numMaps = numMaps;
+    this.numReduces = numReduces;
+    this.historyCopied = historyCopied;
+  }
+
+  @Override
+  public List<String> getBlackListedTrackers() {
+    return blackListedTracker;
+  }
+
+  @Override
+  public String getHistoryUrl() {
+    return historyUrl;
+  }
+
+  @Override
+  public JobID getID() {
+    return id;
+  }
+
+  @Override
+  public JobStatus getStatus() {
+    return status;
+  }
+
+  @Override
+  public boolean isCleanupLaunched() {
+    return cleanupLaunched;
+  }
+
+  @Override
+  public boolean isSetupLaunched() {
+    return setupLaunched;
+  }
+
+  @Override
+  public boolean isSetupFinished() {
+    return setupFinished;
+  }
+
+  @Override
+  public int runningMaps() {
+    return runningMaps;
+  }
+
+  @Override
+  public int runningReduces() {
+    return runningReduces;
+  }
+
+  @Override
+  public int waitingMaps() {
+    return waitingMaps;
+  }
+
+  @Override
+  public int waitingReduces() {
+    return waitingReduces;
+  }
+ 
+  @Override
+  public int finishedMaps() {
+    return finishedMaps;
+  }
+
+  @Override
+  public int finishedReduces() {
+    return finishedReduces;
+  }
+  
+  @Override
+  public int numMaps() {
+    return numMaps;
+  }
+  
+  @Override
+  public int numReduces() {
+    return numReduces;
+  }
+  
+  @Override
+  public boolean isHistoryFileCopied() {
+    return historyCopied;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    id.readFields(in);
+    setupLaunched = in.readBoolean();
+    setupFinished = in.readBoolean();
+    cleanupLaunched = in.readBoolean();
+    status.readFields(in);
+    runningMaps = in.readInt();
+    runningReduces = in.readInt();
+    waitingMaps = in.readInt();
+    waitingReduces = in.readInt();
+    historyUrl = in.readUTF();
+    int size = in.readInt();
+    for (int i = 0; i < size; i++) {
+      blackListedTracker.add(in.readUTF());
+    }
+    finishedMaps = in.readInt();
+    finishedReduces = in.readInt();
+    numMaps = in.readInt();
+    numReduces = in.readInt();
+    historyCopied = in.readBoolean();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    id.write(out);
+    out.writeBoolean(setupLaunched);
+    out.writeBoolean(setupFinished);
+    out.writeBoolean(cleanupLaunched);
+    status.write(out);
+    out.writeInt(runningMaps);
+    out.writeInt(runningReduces);
+    out.writeInt(waitingMaps);
+    out.writeInt(waitingReduces);
+    out.writeUTF(historyUrl);
+    out.writeInt(blackListedTracker.size());
+    for (String str : blackListedTracker) {
+      out.writeUTF(str);
+    }
+    out.writeInt(finishedMaps);
+    out.writeInt(finishedReduces);
+    out.writeInt(numMaps);
+    out.writeInt(numReduces);
+    out.writeBoolean(historyCopied);
+  }
+
+
+}

+ 54 - 0
src/test/system/java/org/apache/hadoop/mapred/TTInfoImpl.java

@@ -0,0 +1,54 @@
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+import org.apache.hadoop.mapreduce.test.system.TTInfo;
+
+/**
+ * Concrete implementation of the TaskTracker information which is passed to 
+ * the client from JobTracker.
+ * Look at {@link TTInfo}
+ */
+
+class TTInfoImpl implements TTInfo {
+
+  private String taskTrackerName;
+  private TaskTrackerStatus status;
+
+  public TTInfoImpl() {
+    taskTrackerName = "";
+    status = new TaskTrackerStatus();
+  }
+  
+  public TTInfoImpl(String taskTrackerName, TaskTrackerStatus status) {
+    super();
+    this.taskTrackerName = taskTrackerName;
+    this.status = status;
+  }
+
+  @Override
+  public String getName() {
+    return taskTrackerName;
+  }
+
+  @Override
+  public TaskTrackerStatus getStatus() {
+    return status;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskTrackerName = in.readUTF();
+    status.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(taskTrackerName);
+    status.write(out);
+  }
+
+}

+ 96 - 0
src/test/system/java/org/apache/hadoop/mapred/TTTaskInfoImpl.java

@@ -0,0 +1,96 @@
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.MapTask;
+import org.apache.hadoop.mapred.ReduceTask;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapreduce.test.system.TTTaskInfo;
+/**
+ * Abstract class which passes the Task view of the TaskTracker to the client.
+ * See {@link TTInfoImpl} for further details.
+ *
+ */
+abstract class TTTaskInfoImpl implements TTTaskInfo {
+
+  private String diagonsticInfo;
+  private Task task;
+  private boolean slotTaken;
+  private boolean wasKilled;
+
+  public TTTaskInfoImpl() {
+  }
+
+  public TTTaskInfoImpl(Task task, boolean slotTaken, boolean wasKilled,
+      String diagonsticInfo) {
+    super();
+    this.diagonsticInfo = diagonsticInfo;
+    this.task = task;
+    this.slotTaken = slotTaken;
+    this.wasKilled = wasKilled;
+  }
+
+  @Override
+  public String getDiagnosticInfo() {
+    return diagonsticInfo;
+  }
+
+  @Override
+  public Task getTask() {
+    return task;
+  }
+
+  @Override
+  public boolean slotTaken() {
+    return slotTaken;
+  }
+
+  @Override
+  public boolean wasKilled() {
+    return wasKilled;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    task.readFields(in);
+    slotTaken = in.readBoolean();
+    wasKilled = in.readBoolean();
+    diagonsticInfo = in.readUTF();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    task.write(out);
+    out.writeBoolean(slotTaken);
+    out.writeBoolean(wasKilled);
+    out.writeUTF(diagonsticInfo);
+  }
+
+  static class MapTTTaskInfo extends TTTaskInfoImpl {
+
+    public MapTTTaskInfo() {
+      super(new MapTask(), false, false, "");
+    }
+
+    public MapTTTaskInfo(MapTask task, boolean slotTaken, boolean wasKilled,
+        String diagonsticInfo) {
+      super(task, slotTaken, wasKilled, diagonsticInfo);
+    }
+  }
+
+  static class ReduceTTTaskInfo extends TTTaskInfoImpl {
+
+    public ReduceTTTaskInfo() {
+      super(new ReduceTask(), false, false, "");
+    }
+
+    public ReduceTTTaskInfo(ReduceTask task, boolean slotTaken,
+        boolean wasKilled, String diagonsticInfo) {
+      super(task, slotTaken, wasKilled, diagonsticInfo);
+    }
+
+  }
+
+}

+ 116 - 0
src/test/system/java/org/apache/hadoop/mapred/TaskInfoImpl.java

@@ -0,0 +1,116 @@
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+
+/**
+ * Concrete class to expose out the task related information to the Clients from
+ * the JobTracker.
+ * Look at {@link TaskInfo} for further details.
+ */
+class TaskInfoImpl implements TaskInfo {
+
+  private double progress;
+  private TaskID taskID;
+  private int killedAttempts;
+  private int failedAttempts;
+  private int runningAttempts;
+  private TaskStatus[] taskStatus;
+
+  public TaskInfoImpl() {
+    taskID = new TaskID();
+  }
+  public TaskInfoImpl(TaskID taskID, double progress, int runningAttempts,
+      int killedAttempts, int failedAttempts, TaskStatus[] taskStatus) {
+    this.progress = progress;
+    this.taskID = taskID;
+    this.killedAttempts = killedAttempts;
+    this.failedAttempts = failedAttempts;
+    this.runningAttempts = runningAttempts;
+    if (taskStatus != null) {
+      this.taskStatus = taskStatus;
+    }
+    else { 
+      if (taskID.isMap()) {
+        this.taskStatus = new MapTaskStatus[]{};
+      }
+      else {
+        this.taskStatus = new ReduceTaskStatus[]{};
+      }
+    }
+    
+  }
+
+  @Override
+  public double getProgress() {
+    return progress;
+  }
+
+  @Override
+  public TaskID getTaskID() {
+    return taskID;
+  }
+
+  @Override
+  public int numKilledAttempts() {
+    return killedAttempts;
+  }
+
+  @Override
+  public int numFailedAttempts() {
+    return failedAttempts;
+  }
+
+  @Override
+  public int numRunningAttempts() {
+    return runningAttempts;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskID.readFields(in);
+    progress = in.readDouble();
+    runningAttempts = in.readInt();
+    killedAttempts = in.readInt();
+    failedAttempts = in.readInt();
+    int size = in.readInt();
+    if (taskID.isMap()) {
+      taskStatus = new MapTaskStatus[size];
+    }
+    else {
+      taskStatus = new ReduceTaskStatus[size];
+    }
+    for (int i = 0; i < size; i++) {
+      if (taskID.isMap()) {
+        taskStatus[i] = new MapTaskStatus();
+      }
+      else {
+        taskStatus[i] = new ReduceTaskStatus();
+      }
+      taskStatus[i].readFields(in);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    taskID.write(out);
+    out.writeDouble(progress);
+    out.writeInt(runningAttempts);
+    out.writeInt(killedAttempts);
+    out.writeInt(failedAttempts);
+    out.writeInt(taskStatus.length);
+    for (TaskStatus t : taskStatus) {
+      t.write(out);
+    }
+  }
+  
+  @Override
+  public TaskStatus[] getTaskStatus() {
+    return taskStatus;
+  }
+}

+ 101 - 0
src/test/system/java/org/apache/hadoop/mapred/TestCluster.java

@@ -0,0 +1,101 @@
+package org.apache.hadoop.mapred;
+
+import java.util.Collection;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.test.system.JTClient;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestCluster {
+
+  private static final Log LOG = LogFactory.getLog(TestCluster.class);
+
+  private static MRCluster cluster;
+
+  public TestCluster() throws Exception {
+    
+  }
+
+  @BeforeClass
+  public static void before() throws Exception {
+    cluster = MRCluster.createCluster(new Configuration());
+    cluster.setUp();
+  }
+
+  @AfterClass
+  public static void after() throws Exception {
+    cluster.tearDown();
+  }
+
+  @Test
+  public void testProcessInfo() throws Exception {
+    LOG.info("Process info of master is : "
+        + cluster.getMaster().getProcessInfo());
+    Assert.assertNotNull(cluster.getMaster().getProcessInfo());
+    Collection<TTClient> slaves = cluster.getSlaves().values();
+    for (TTClient slave : slaves) {
+      LOG.info("Process info of slave is : " + slave.getProcessInfo());
+      Assert.assertNotNull(slave.getProcessInfo());
+    }
+  }
+  
+  @Test
+  public void testJobSubmission() throws Exception {
+    Configuration conf = new Configuration(cluster.getConf());
+    JTProtocol wovenClient = cluster.getMaster().getProxy();
+    JobInfo[] jobs = wovenClient.getAllJobInfo();
+    SleepJob job = new SleepJob();
+    job.setConf(conf);
+    conf = job.setupJobConf(1, 1, 100, 100, 100, 100);
+    RunningJob rJob = cluster.getMaster().submitAndVerifyJob(conf);
+    cluster.getMaster().verifyJobHistory(rJob.getID());
+  }
+
+  @Test
+  public void testFileStatus() throws Exception {
+    JTClient jt = cluster.getMaster();
+    String dir = ".";
+    checkFileStatus(jt.getFileStatus(dir, true));
+    checkFileStatus(jt.listStatus(dir, false, true), dir);
+    for (TTClient tt : cluster.getSlaves().values()) {
+      String[] localDirs = tt.getMapredLocalDirs();
+      for (String localDir : localDirs) {
+        checkFileStatus(tt.listStatus(localDir, true, false), localDir);
+        checkFileStatus(tt.listStatus(localDir, true, true), localDir);
+      }
+    }
+    String systemDir = jt.getClient().getSystemDir().toString();
+    checkFileStatus(jt.listStatus(systemDir, false, true), systemDir);
+    checkFileStatus(jt.listStatus(jt.getLogDir(), true, true), jt.getLogDir());
+  }
+
+  private void checkFileStatus(FileStatus[] fs, String path) {
+    Assert.assertNotNull(fs);
+    LOG.info("-----Listing for " + path + "  " + fs.length);
+    for (FileStatus fz : fs) {
+      checkFileStatus(fz);
+    }
+  }
+
+  private void checkFileStatus(FileStatus fz) {
+    Assert.assertNotNull(fz);
+    LOG.info("FileStatus is " + fz.getPath() 
+        + "  " + fz.getPermission()
+        +"  " + fz.getOwner()
+        +"  " + fz.getGroup()
+        +"  " + fz.getClass());
+  }
+
+}

+ 181 - 0
src/test/system/java/org/apache/hadoop/mapred/TestSortValidate.java

@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.examples.RandomWriter;
+import org.apache.hadoop.examples.Sort;
+
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * A System test to test the Map-Reduce framework's sort 
+ * with a real Map-Reduce Cluster.
+ */
+public class TestSortValidate {
+  // Input/Output paths for sort
+  private static final Path SORT_INPUT_PATH = new Path("inputDirectory");
+  private static final Path SORT_OUTPUT_PATH = new Path("outputDirectory");
+
+  // make it big enough to cause a spill in the map
+  private static final int RW_BYTES_PER_MAP = 3 * 1024 * 1024;
+  private static final int RW_MAPS_PER_HOST = 2;
+
+  private MRCluster cluster = null;
+  private FileSystem dfs = null;
+  private JobClient client = null;
+
+  private static final Log LOG = LogFactory.getLog(TestSortValidate.class);
+
+  public TestSortValidate()
+  throws Exception {
+    cluster = MRCluster.createCluster(new Configuration());
+  }
+
+  @Before
+  public void setUp() throws java.lang.Exception {
+    cluster.setUp();
+    client = cluster.getMaster().getClient();
+
+    dfs = client.getFs();
+    dfs.delete(SORT_INPUT_PATH, true);
+    dfs.delete(SORT_OUTPUT_PATH, true);
+  }
+
+  @After
+  public void after() throws Exception {
+    cluster.tearDown();
+    dfs.delete(SORT_INPUT_PATH, true);
+    dfs.delete(SORT_OUTPUT_PATH, true);
+  }
+
+  public void runRandomWriter(Configuration job, Path sortInput) 
+  throws Exception {
+    // Scale down the default settings for RandomWriter for the test-case
+    // Generates NUM_HADOOP_SLAVES * RW_MAPS_PER_HOST * RW_BYTES_PER_MAP
+    job.setInt("test.randomwrite.bytes_per_map", RW_BYTES_PER_MAP);
+    job.setInt("test.randomwriter.maps_per_host", RW_MAPS_PER_HOST);
+    String[] rwArgs = {sortInput.toString()};
+ 
+    runAndVerify(job,new RandomWriter(), rwArgs);
+  }
+
+  private void runAndVerify(Configuration job, Tool tool, String[] args)
+    throws Exception {
+
+    // This calculates the previous number fo jobs submitted before a new
+    // job gets submitted.
+    int prevJobsNum = 0;
+
+    // JTProtocol wovenClient
+    JTProtocol wovenClient = cluster.getMaster().getProxy();
+
+    // JobStatus
+    JobStatus[] jobStatus = null;
+
+    // JobID
+    JobID id = null;
+
+    // RunningJob rJob;
+    RunningJob rJob = null;
+
+    // JobInfo jInfo;
+    JobInfo jInfo = null;
+
+    //Getting the previous job numbers that are submitted.
+    jobStatus = client.getAllJobs();
+    prevJobsNum = jobStatus.length;
+
+    // Run RandomWriter
+    Assert.assertEquals(ToolRunner.run(job, tool, args), 0);
+
+    //Waiting for the job to appear in the jobstatus
+    jobStatus = client.getAllJobs();
+
+    while (jobStatus.length - prevJobsNum == 0) {
+      LOG.info("Waiting for the job to appear in the jobStatus");
+      Thread.sleep(1000);
+      jobStatus = client.getAllJobs();
+    }
+
+    //Getting the jobId of the just submitted job
+    //The just submitted job is always added in the first slot of jobstatus
+    id = jobStatus[0].getJobID();
+
+    rJob = client.getJob(id);
+
+    jInfo = wovenClient.getJobInfo(id);
+
+    //Making sure that the job is complete.
+    while (jInfo != null && !jInfo.getStatus().isJobComplete()) {
+      Thread.sleep(10000);
+      jInfo = wovenClient.getJobInfo(id);
+    }
+
+    cluster.getMaster().verifyCompletedJob(id);
+  }
+  
+  private void runSort(Configuration job, Path sortInput, Path sortOutput) 
+  throws Exception {
+
+    job.setInt("io.sort.mb", 1);
+
+    // Setup command-line arguments to 'sort'
+    String[] sortArgs = {sortInput.toString(), sortOutput.toString()};
+    
+    runAndVerify(job,new Sort(), sortArgs);
+
+  }
+  
+  private void runSortValidator(Configuration job, 
+                                       Path sortInput, Path sortOutput) 
+  throws Exception {
+    String[] svArgs = {"-sortInput", sortInput.toString(), 
+                       "-sortOutput", sortOutput.toString()};
+
+    runAndVerify(job,new SortValidator(), svArgs);
+
+  }
+ 
+  @Test 
+  public void testMapReduceSort() throws Exception {
+    // Run randomwriter to generate input for 'sort'
+    runRandomWriter(cluster.getConf(), SORT_INPUT_PATH);
+
+    // Run sort
+    runSort(cluster.getConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
+
+    // Run sort-validator to check if sort worked correctly
+    runSortValidator(cluster.getConf(), SORT_INPUT_PATH, 
+                     SORT_OUTPUT_PATH);
+  }
+}

+ 129 - 0
src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java

@@ -0,0 +1,129 @@
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.examples.WordCount.IntSumReducer;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
+import org.apache.hadoop.mapreduce.test.system.JTClient;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.io.Text;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import testjar.UserNamePermission;
+
+public class TestTaskOwner {
+  private static final Log LOG = LogFactory.getLog(TestTaskOwner.class);
+  private static Path outDir = new Path("output");
+  private static Path inDir = new Path("input");
+  public static MRCluster cluster;
+
+  // The role of this job is to write the user name to the output file
+  // which will be parsed
+
+  @BeforeClass
+  public static void setUp() throws java.lang.Exception {
+
+    cluster = MRCluster.createCluster(new Configuration());
+    cluster.setUp();
+    FileSystem fs = inDir.getFileSystem(cluster.getMaster().getConf());
+    fs.create(inDir);
+  }
+
+  @Test
+  public void testProcessPermission() throws Exception {
+  // The user will submit a job which a plain old map reduce job
+  // this job will output the username of the task that is running
+  // in the cluster and we will authenticate whether matches
+  // with the job that is submitted by the same user.
+
+    Configuration conf = cluster.getMaster().getConf();
+    Job job = new Job(conf, "user name check");
+
+    job.setJarByClass(UserNamePermission.class);
+    job.setMapperClass(UserNamePermission.UserNameMapper.class);
+    job.setCombinerClass(UserNamePermission.UserNameReducer.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+
+    job.setReducerClass(UserNamePermission.UserNameReducer.class);
+    job.setNumReduceTasks(1);
+
+    FileInputFormat.addInputPath(job, inDir);
+    FileOutputFormat.setOutputPath(job, outDir);
+
+    job.waitForCompletion(true);
+
+    // now verify the user name that is written by the task tracker is same
+    // as the
+    // user name that was used to launch the task in the first place
+    FileSystem fs = outDir.getFileSystem(conf);
+    StringBuffer result = new StringBuffer();
+
+    Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+     new Utils.OutputFileUtils.OutputFilesFilter()));
+
+    for (int i = 0; i < fileList.length; ++i) {
+	  LOG.info("File list[" + i + "]" + ": " + fileList[i]);
+	  BufferedReader file = new BufferedReader(new InputStreamReader(fs
+      .open(fileList[i])));
+       String line = file.readLine();
+       while (line != null) {
+         StringTokenizer token = new StringTokenizer(line);
+         if (token.hasMoreTokens()) {
+           LOG.info("First token " + token.nextToken());
+           String userName = token.nextToken();
+
+           LOG.info("Next token " + userName);
+           Assert
+             .assertEquals(
+              "The user name did not match permission violation ",
+               userName, System.getProperty("user.name")
+              .toString());
+           break;
+         }
+
+        }
+        file.close();
+     }
+
+  }
+
+  @AfterClass
+  public static void tearDown() throws java.lang.Exception {
+    FileSystem fs = outDir.getFileSystem(cluster.getMaster().getConf());
+    fs.delete(outDir, true);
+    cluster.tearDown();
+   }
+
+}
+
+

+ 299 - 0
src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java

@@ -0,0 +1,299 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import static org.junit.Assert.*;
+
+/**
+ * JobTracker client for system tests.
+ */
+public class JTClient extends MRDaemonClient<JTProtocol> {
+  static final Log LOG = LogFactory.getLog(JTClient.class);
+  private JobClient client;
+
+  /**
+   * Create JobTracker client to talk to {@link JobTracker} specified in the
+   * configuration. <br/>
+   * 
+   * @param conf
+   *          configuration used to create a client.
+   * @param daemon
+   *          the process management instance for the {@link JobTracker}
+   * @throws IOException
+   */
+  public JTClient(Configuration conf, RemoteProcess daemon) throws 
+    IOException {
+    super(conf, daemon);
+  }
+
+  @Override
+  public synchronized void connect() throws IOException {
+    if (isConnected()) {
+      return;
+    }
+    client = new JobClient(new JobConf(getConf()));
+    setConnected(true);
+  }
+
+  @Override
+  public synchronized void disconnect() throws IOException {
+    client.close();
+  }
+
+  @Override
+  public synchronized JTProtocol getProxy() {
+    return (JTProtocol) client.getProtocol();
+  }
+
+  /**
+   * Gets the {@link JobClient} which can be used for job submission. JobClient
+   * which is returned would not contain the decorated API's. To be used for
+   * submitting of the job.
+   * 
+   * @return client handle to the JobTracker
+   */
+  public JobClient getClient() {
+    return client;
+  }
+
+  /**
+   * Gets the configuration which the JobTracker is currently running.<br/>
+   * 
+   * @return configuration of JobTracker.
+   * 
+   * @throws IOException
+   */
+  public Configuration getJobTrackerConfig() throws IOException {
+    return getProxy().getDaemonConf();
+  }
+
+  /**
+   * Verification API to check running jobs and running job states.
+   * users have to ensure that their jobs remain running state while
+   * verification is called. <br/>
+   * 
+   * @param id
+   *          of the job to be verified.
+   * 
+   * @throws Exception
+   */
+  public void verifyRunningJob(JobID jobId) throws Exception {
+  }
+
+  private boolean checkJobValidityForProceeding(JobID jobId, JobInfo jobInfo)
+  throws IOException {
+    if (jobInfo != null) {
+      return true;
+    } else if (jobInfo == null && !getProxy().isJobRetired(jobId)) {
+      Assert.fail("Job id : " + jobId + " has never been submitted to JT");
+    }
+    return false;
+  }
+  
+  /**
+   * Verification API to wait till job retires and verify all the retired state
+   * is correct. 
+   * <br/>
+   * @param conf of the job used for completion
+   * @return job handle
+   * @throws Exception
+   */
+  public RunningJob submitAndVerifyJob(Configuration conf) throws Exception {
+    JobConf jconf = new JobConf(conf);
+    RunningJob rJob = getClient().submitJob(jconf);
+    JobID jobId = rJob.getID();
+    verifyRunningJob(jobId);
+    verifyCompletedJob(jobId);
+    return rJob;
+  }
+  
+  /**
+   * Verification API to check if the job completion state is correct. <br/>
+   * 
+   * @param id id of the job to be verified.
+   */
+  
+  public void verifyCompletedJob(JobID id) throws Exception{
+    RunningJob rJob = getClient().getJob(
+        org.apache.hadoop.mapred.JobID.downgrade(id));
+    while(!rJob.isComplete()) {
+      LOG.info("waiting for job :" + id + " to retire");
+      Thread.sleep(1000);
+      rJob = getClient().getJob(
+          org.apache.hadoop.mapred.JobID.downgrade(id));
+    }
+    verifyJobDetails(id);
+    JobInfo jobInfo = getProxy().getJobInfo(id);
+    if(jobInfo == null && 
+        !getProxy().isJobRetired(id)) {
+      Assert.fail("The passed job id : " + id + 
+          " is not submitted to JT.");
+    }
+    while(!jobInfo.isHistoryFileCopied()) {
+      Thread.sleep(1000);
+      LOG.info(id+" waiting for history file to copied");
+      jobInfo = getProxy().getJobInfo(id);
+    }
+    verifyJobHistory(id);
+  }
+
+  /**
+   * Verification API to check if the job details are semantically correct.<br/>
+   * 
+   *  @param jobId
+   *          jobID of the job
+   * @param jconf
+   *          configuration object of the job
+   * @return true if all the job verifications are verified to be true
+   * @throws Exception
+   */
+  public void verifyJobDetails(JobID jobId) throws Exception {
+    // wait till the setup is launched and finished.
+    JobInfo jobInfo = getProxy().getJobInfo(jobId);
+    if(!checkJobValidityForProceeding(jobId, jobInfo)){
+      return;
+    }
+    LOG.info("waiting for the setup to be finished");
+    while (!jobInfo.isSetupFinished()) {
+      Thread.sleep(2000);
+      jobInfo = getProxy().getJobInfo(jobId);
+    }
+    // verify job id.
+    assertTrue(jobId.toString().startsWith("job_"));
+    LOG.info("verified job id and is : " + jobId.toString());
+    // verify the number of map/reduce tasks.
+    verifyNumTasks(jobId);
+    // should verify job progress.
+    verifyJobProgress(jobId);
+    jobInfo = getProxy().getJobInfo(jobId);
+    if (jobInfo.getStatus().getRunState() == JobStatus.SUCCEEDED) {
+      // verify if map/reduce progress reached 1.
+      jobInfo = getProxy().getJobInfo(jobId);
+      checkJobValidityForProceeding(jobId, jobInfo);
+      assertEquals(1.0, jobInfo.getStatus().mapProgress(), 0.001);
+      assertEquals(1.0, jobInfo.getStatus().reduceProgress(), 0.001);
+      // verify successful finish of tasks.
+      verifyAllTasksSuccess(jobId);
+    }
+    if (jobInfo.getStatus().isJobComplete()) {
+      // verify if the cleanup is launched.
+      jobInfo = getProxy().getJobInfo(jobId);
+      checkJobValidityForProceeding(jobId, jobInfo);
+      assertTrue(jobInfo.isCleanupLaunched());
+      LOG.info("Verified launching of cleanup");
+    }
+  }
+
+  
+  public void verifyAllTasksSuccess(JobID jobId) throws IOException {
+    JobInfo jobInfo = getProxy().getJobInfo(jobId);
+    
+    if(!checkJobValidityForProceeding(jobId, jobInfo)){ 
+      return;
+    }
+    
+    TaskInfo[] taskInfos = getProxy().getTaskInfo(jobId);
+    
+    if(taskInfos.length == 0 && getProxy().isJobRetired(jobId)) {
+      LOG.info("Job has been retired from JT memory : " + jobId);
+      return;
+    }
+    
+    for (TaskInfo taskInfo : taskInfos) {
+      TaskStatus[] taskStatus = taskInfo.getTaskStatus();
+      if (taskStatus != null && taskStatus.length > 0) {
+        int i;
+        for (i = 0; i < taskStatus.length; i++) {
+          if (TaskStatus.State.SUCCEEDED.equals(taskStatus[i].getRunState())) {
+            break;
+          }
+        }
+        assertFalse(i == taskStatus.length);
+      }
+    }
+    LOG.info("verified that none of the tasks failed.");
+  }
+  
+  public void verifyJobProgress(JobID jobId) throws IOException {
+    JobInfo jobInfo;
+    jobInfo = getProxy().getJobInfo(jobId);
+    if(!checkJobValidityForProceeding(jobId, jobInfo)){
+      return;
+    }
+    assertTrue(jobInfo.getStatus().mapProgress() >= 0 && jobInfo.getStatus()
+        .mapProgress() <= 1);
+    LOG.info("verified map progress and is "
+        + jobInfo.getStatus().mapProgress());    
+    assertTrue(jobInfo.getStatus().reduceProgress() >= 0 && jobInfo.getStatus()
+        .reduceProgress() <= 1);
+    LOG.info("verified reduce progress and is "
+        + jobInfo.getStatus().reduceProgress());
+  }
+  
+  public void verifyNumTasks(JobID jobId) throws IOException {
+    JobInfo jobInfo;
+    jobInfo = getProxy().getJobInfo(jobId);
+    if(!checkJobValidityForProceeding(jobId, jobInfo)) {
+      return;
+    }
+    assertEquals(jobInfo.numMaps(), (jobInfo.runningMaps()
+        + jobInfo.waitingMaps() + jobInfo.finishedMaps()));
+    LOG.info("verified number of map tasks and is " + jobInfo.numMaps());
+    
+    assertEquals(jobInfo.numReduces(),  (jobInfo.runningReduces()
+        + jobInfo.waitingReduces() + jobInfo.finishedReduces()));
+    LOG.info("verified number of reduce tasks and is "
+        + jobInfo.numReduces());
+  }
+
+  /**
+   * Verification API to check if the job history file is semantically correct.
+   * <br/>
+   * 
+   * 
+   * @param id
+   *          of the job to be verified.
+   * @throws IOException
+   */
+  public void verifyJobHistory(JobID jobId) throws IOException {
+    JobInfo info = getProxy().getJobInfo(jobId);
+    String url ="";
+    info = getProxy().getJobInfo(jobId);
+    if(info == null && !getProxy().isJobRetired(jobId)) {
+      Assert.fail("Job id : " + jobId + 
+          " has never been submitted to JT");
+    } else if(info == null) {
+      LOG.info("Job has been retired from JT memory : " + jobId);
+      url = getProxy().getJobHistoryLocationForRetiredJob(jobId);
+    } else {
+      url = info.getHistoryUrl();
+    }
+    Path p = new Path(url);
+    if (p.toUri().getScheme().equals("file:/")) {
+      FileStatus st = getFileStatus(url, true);
+      Assert.assertNotNull("Job History file for " + jobId + " not present " +
+          "when job is completed" , st);
+    } else {
+      FileStatus st = getFileStatus(url, false);
+      Assert.assertNotNull("Job History file for " + jobId + " not present " +
+          "when job is completed" , st);
+    }
+    LOG.info("Verified the job history for the jobId : " + jobId);
+  }
+}

+ 91 - 0
src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java

@@ -0,0 +1,91 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.test.system.DaemonProtocol;
+
+/**
+ * Client side API's exposed from JobTracker.
+ */
+public interface JTProtocol extends DaemonProtocol {
+  long versionID = 1L;
+
+  /**
+   * Get the information pertaining to given job.<br/>
+   * 
+   * @param id
+   *          of the job for which information is required.
+   * @return information of regarding job.
+   * @throws IOException
+   */
+  public JobInfo getJobInfo(JobID jobID) throws IOException;
+
+  /**
+   * Gets the information pertaining to a task. <br/>
+   * 
+   * @param id
+   *          of the task for which information is required.
+   * @return information of regarding the task.
+   * @throws IOException
+   */
+  public TaskInfo getTaskInfo(TaskID taskID) throws IOException;
+
+  /**
+   * Gets the information pertaining to a given TaskTracker. <br/>
+   * 
+   * @param name
+   *          of the tracker.
+   * @return information regarding the tracker.
+   * @throws IOException
+   */
+  public TTInfo getTTInfo(String trackerName) throws IOException;
+
+  /**
+   * Gets a list of all available jobs with JobTracker.<br/>
+   * 
+   * @return list of all jobs.
+   * @throws IOException
+   */
+  public JobInfo[] getAllJobInfo() throws IOException;
+
+  /**
+   * Gets a list of tasks pertaining to a job. <br/>
+   * 
+   * @param id
+   *          of the job.
+   * 
+   * @return list of all tasks for the job.
+   * @throws IOException
+   */
+  public TaskInfo[] getTaskInfo(JobID jobID) throws IOException;
+
+  /**
+   * Gets a list of TaskTrackers which have reported to the JobTracker. <br/>
+   * 
+   * @return list of all TaskTracker.
+   * @throws IOException
+   */
+  public TTInfo[] getAllTTInfo() throws IOException;
+
+  /**
+   * Checks if a given job is retired from the JobTrackers Memory. <br/>
+   * 
+   * @param id
+   *          of the job
+   * @return true if job is retired.
+   * @throws IOException
+   */
+  boolean isJobRetired(JobID jobID) throws IOException;
+
+  /**
+   * Gets the location of the history file for a retired job. <br/>
+   * 
+   * @param id
+   *          of the job
+   * @return location of history file
+   * @throws IOException
+   */
+  String getJobHistoryLocationForRetiredJob(JobID jobID) throws IOException;
+}

+ 121 - 0
src/test/system/java/org/apache/hadoop/mapreduce/test/system/JobInfo.java

@@ -0,0 +1,121 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * Job state information as seen by the JobTracker.
+ */
+public interface JobInfo extends Writable {
+  /**
+   * Gets the JobId of the job.<br/>
+   * 
+   * @return id of the job.
+   */
+  JobID getID();
+
+  /**
+   * Gets the current status of the job.<br/>
+   * 
+   * @return status.
+   */
+  JobStatus getStatus();
+
+  /**
+   * Gets the history location of the job.<br/>
+   * 
+   * @return the path to the history file.
+   */
+  String getHistoryUrl();
+
+  /**
+   * Gets the number of maps which are currently running for the job. <br/>
+   * 
+   * @return number of running for the job.
+   */
+  int runningMaps();
+
+  /**
+   * Gets the number of reduces currently running for the job. <br/>
+   * 
+   * @return number of reduces running for the job.
+   */
+  int runningReduces();
+
+  /**
+   * Gets the number of maps to be scheduled for the job. <br/>
+   * 
+   * @return number of waiting maps.
+   */
+  int waitingMaps();
+
+  /**
+   * Gets the number of reduces to be scheduled for the job. <br/>
+   * 
+   * @return number of waiting reduces.
+   */
+  int waitingReduces();
+  
+  /**
+   * Gets the number of maps that are finished. <br/>
+   * @return the number of finished maps.
+   */
+  int finishedMaps();
+  
+  /**
+   * Gets the number of map tasks that are to be spawned for the job <br/>
+   * @return
+   */
+  int numMaps();
+  
+  /**
+   * Gets the number of reduce tasks that are to be spawned for the job <br/>
+   * @return
+   */
+  int numReduces();
+  
+  /**
+   * Gets the number of reduces that are finished. <br/>
+   * @return the number of finished reduces.
+   */
+  int finishedReduces();
+
+  /**
+   * Gets if cleanup for the job has been launched.<br/>
+   * 
+   * @return true if cleanup task has been launched.
+   */
+  boolean isCleanupLaunched();
+
+  /**
+   * Gets if the setup for the job has been launched.<br/>
+   * 
+   * @return true if setup task has been launched.
+   */
+  boolean isSetupLaunched();
+
+  /**
+   * Gets if the setup for the job has been completed.<br/>
+   * 
+   * @return true if the setup task for the job has completed.
+   */
+  boolean isSetupFinished();
+
+  /**
+   * Gets list of blacklisted trackers for the particular job. <br/>
+   * 
+   * @return list of blacklisted tracker name.
+   */
+  List<String> getBlackListedTrackers();
+  
+  /**
+   * Gets if the history file of the job is copied to the done 
+   * location <br/>
+   * 
+   * @return true if history file copied.
+   */
+  boolean isHistoryFileCopied();
+}

+ 72 - 0
src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java

@@ -0,0 +1,72 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.system.AbstractMasterSlaveCluster;
+import org.apache.hadoop.test.system.process.ClusterProcessManager;
+import org.apache.hadoop.test.system.process.ClusterProcessManagerFactory;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+import org.apache.hadoop.test.system.process.ClusterProcessManager.ClusterType;
+
+/**
+ * Concrete MasterSlaveCluster representing a Map-Reduce cluster.
+ * 
+ */
+public class MRCluster extends AbstractMasterSlaveCluster<JTClient, 
+      TTClient> {
+
+  private static final Log LOG = LogFactory.getLog(MRCluster.class);
+
+  private MRCluster(Configuration conf, ClusterProcessManager rCluster)
+      throws IOException {
+    super(conf, rCluster);
+  }
+
+  /**
+   * Creates an instance of the Map-Reduce cluster.<br/>
+   * Example usage: <br/>
+   * <code>
+   * Configuration conf = new Configuration();<br/>
+   * conf.set(ClusterProcessManager.IMPL_CLASS,
+   * org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster.
+   * class.getName())<br/>
+   * conf.set(HadoopDaemonRemoteCluster.CONF_HADOOPHOME,
+   * "/path");<br/>
+   * conf.set(HadoopDaemonRemoteCluster.CONF_HADOOPCONFDIR,
+   * "/path");<br/>
+   * MRCluster cluster = MRCluster.createCluster(conf);
+   * </code>
+   * 
+   * @param conf
+   *          contains all required parameter to create cluster.
+   * @return a cluster instance to be managed.
+   * @throws IOException
+   * @throws Exception
+   */
+  public static MRCluster createCluster(Configuration conf) 
+      throws IOException, Exception {
+    return new MRCluster(conf, ClusterProcessManagerFactory.createInstance(
+        ClusterType.MAPRED, conf));
+  }
+
+  @Override
+  protected JTClient createMaster(RemoteProcess masterDaemon)
+      throws IOException {
+    return new JTClient(getConf(), masterDaemon);
+  }
+
+  @Override
+  protected TTClient createSlave(RemoteProcess slaveDaemon) 
+      throws IOException {
+    return new TTClient(getConf(), slaveDaemon);
+  }
+
+  @Override
+  public void ensureClean() throws IOException {
+    //TODO: ensure that no jobs/tasks are running
+    //restart the cluster if cleanup fails
+  }
+}

+ 28 - 0
src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRDaemonClient.java

@@ -0,0 +1,28 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.system.AbstractDaemonClient;
+import org.apache.hadoop.test.system.DaemonProtocol;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+
+/**
+ * Base class for JobTracker and TaskTracker clients.
+ */
+public abstract class MRDaemonClient<PROXY extends DaemonProtocol> 
+    extends AbstractDaemonClient<PROXY>{
+
+  public MRDaemonClient(Configuration conf, RemoteProcess process)
+      throws IOException {
+    super(conf, process);
+  }
+
+  public String[] getMapredLocalDirs() throws IOException {
+    return getProxy().getDaemonConf().getStrings("mapred.local.dir");
+  }
+
+  public String getLogDir() throws IOException {
+    return getProcessInfo().getSystemProperties().get("hadoop.log.dir");
+  }
+}

+ 12 - 0
src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRFault.java

@@ -0,0 +1,12 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+/**
+ * Fault injection types. At a given time any of these faults (0 or more) 
+ * can be injected. 
+ * @see AbstractMasterSlaveCluster#enable(List<Enum>)
+ * @see AbstractMasterSlaveCluster#disable(List<Enum>)
+ */
+public enum MRFault {
+  BAD_NODE_HEALTH,
+  STALL_HEARTBEAT
+}

+ 72 - 0
src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java

@@ -0,0 +1,72 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+
+/**
+ * TaskTracker client for system tests. Assumption of the class is that the
+ * configuration key is set for the configuration key : {@code
+ * mapred.task.tracker.report.address}is set, only the port portion of the
+ * address is used.
+ */
+public class TTClient extends MRDaemonClient<TTProtocol> {
+
+  TTProtocol proxy;
+
+  public TTClient(Configuration conf, RemoteProcess daemon) 
+      throws IOException {
+    super(conf, daemon);
+  }
+
+  @Override
+  public synchronized void connect() throws IOException {
+    if (isConnected()) {
+      return;
+    }
+    String sockAddrStr = getConf()
+        .get("mapred.task.tracker.report.address");
+    if (sockAddrStr == null) {
+      throw new IllegalArgumentException(
+          "TaskTracker report address is not set");
+    }
+    String[] splits = sockAddrStr.split(":");
+    if (splits.length != 2) {
+      throw new IllegalArgumentException(
+          "TaskTracker report address not correctly configured");
+    }
+    String port = splits[1];
+    String sockAddr = getHostName() + ":" + port;
+    InetSocketAddress bindAddr = NetUtils.createSocketAddr(sockAddr);
+    proxy = (TTProtocol) RPC.getProxy(TTProtocol.class, TTProtocol.versionID,
+        bindAddr, getConf());
+    setConnected(true);
+  }
+
+  @Override
+  public synchronized void disconnect() throws IOException {
+    RPC.stopProxy(proxy);
+  }
+
+  @Override
+  public synchronized TTProtocol getProxy() {
+    return proxy;
+  }
+
+  /**
+   * Gets the last sent status to the {@link JobTracker}. <br/>
+   * 
+   * @return the task tracker status.
+   * @throws IOException
+   */
+  public TaskTrackerStatus getStatus() throws IOException {
+    return getProxy().getStatus();
+  }
+
+}

+ 24 - 0
src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTInfo.java

@@ -0,0 +1,24 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+
+/**
+ * TaskTracker state information as seen by the JobTracker.
+ */
+public interface TTInfo extends Writable {
+  /**
+   * Gets the {@link TaskTracker} name.<br/>
+   * 
+   * @return name of the tracker.
+   */
+  String getName();
+
+  /**
+   * Gets the current status of the {@link TaskTracker} <br/>
+   * 
+   * @return status of the {@link TaskTracker}
+   */
+  TaskTrackerStatus getStatus();
+}

+ 32 - 0
src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java

@@ -0,0 +1,32 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+import org.apache.hadoop.test.system.DaemonProtocol;
+
+/**
+ * TaskTracker RPC interface to be used for cluster tests.
+ */
+public interface TTProtocol extends DaemonProtocol {
+
+  public static final long versionID = 1L;
+  /**
+   * Gets latest status which was sent in heartbeat to the {@link JobTracker}. 
+   * <br/>
+   * 
+   * @return status
+   * @throws IOException
+   */
+  TaskTrackerStatus getStatus() throws IOException;
+
+  /**
+   * Gets list of all the tasks in the {@link TaskTracker}.<br/>
+   * 
+   * @return list of all the tasks
+   * @throws IOException
+   */
+  TTTaskInfo[] getTasks() throws IOException;
+}

+ 40 - 0
src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java

@@ -0,0 +1,40 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.TaskTracker;
+
+/**
+ * Task state information as seen by the TT.
+ */
+public interface TTTaskInfo extends Writable {
+  /**
+   * Gets the task associated to the instance as seen by {@link TaskTracker}
+   * <br/>
+   * 
+   * @return task.
+   */
+  Task getTask();
+
+  /**
+   * Gets the diagnostic information associated the the task.<br/>
+   * 
+   * @return diagnostic information of the task.
+   */
+  String getDiagnosticInfo();
+
+  /**
+   * Has task occupied a slot? A task occupies a slot once it starts localizing
+   * on the {@link TaskTracker} <br/>
+   * 
+   * @return true if task has started occupying a slot.
+   */
+  boolean slotTaken();
+
+  /**
+   * Has the task been killed? <br/>
+   * 
+   * @return true, if task has been killed.
+   */
+  boolean wasKilled();
+}

+ 57 - 0
src/test/system/java/org/apache/hadoop/mapreduce/test/system/TaskInfo.java

@@ -0,0 +1,57 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.TaskID;
+
+/**
+ * Task state information of a TaskInProgress as seen by the {@link JobTracker}
+ */
+public interface TaskInfo extends Writable {
+  /**
+   * Gets the task id of the TaskInProgress.
+   * 
+   * @return id of the task.
+   */
+  TaskID getTaskID();
+
+  /**
+   * Number of times task attempts have failed for the given TaskInProgress.
+   * <br/>
+   * 
+   * @return number of failed task attempts.
+   */
+  int numFailedAttempts();
+
+  /**
+   * Number of times task attempts have been killed for the given TaskInProgress 
+   * <br/>
+   * 
+   * @return number of killed task attempts.
+   */
+  int numKilledAttempts();
+
+  /**
+   * Gets the progress of the Task in percentage will be in range of 0.0-1.0 
+   * <br/>
+   * 
+   * @return progress of task in percentage.
+   */
+  double getProgress();
+
+  /**
+   * Number of attempts currently running for the given TaskInProgress.<br/>
+   * 
+   * @return number of running attempts.
+   */
+  int numRunningAttempts();
+
+  /**
+   * Array of TaskStatus objects that are related to the corresponding
+   * TaskInProgress object.
+   * 
+   * @return
+   */
+  TaskStatus[] getTaskStatus();
+}

+ 121 - 0
src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java

@@ -0,0 +1,121 @@
+package org.apache.hadoop.test.system;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+/**
+ * Abstract class which encapsulates the DaemonClient which is used in the 
+ * system tests.<br/>
+ * 
+ * @param PROXY the proxy implementation of a specific Daemon 
+ */
+public abstract class AbstractDaemonClient<PROXY extends DaemonProtocol> {
+  private Configuration conf;
+  private RemoteProcess process;
+  private boolean connected;
+
+  /**
+   * Create a Daemon client.<br/>
+   * 
+   * @param conf client to be used by proxy to connect to Daemon.
+   * @param process the Daemon process to manage the particular daemon.
+   * 
+   * @throws IOException
+   */
+  public AbstractDaemonClient(Configuration conf, RemoteProcess process) 
+      throws IOException {
+    this.conf = conf;
+    this.process = process;
+  }
+
+  public boolean isConnected() {
+    return connected;
+  }
+
+  protected void setConnected(boolean connected) {
+    this.connected = connected;
+  }
+
+  public abstract void connect() throws IOException;
+
+  public abstract void disconnect() throws IOException;
+
+  /**
+   * Get the proxy to connect to a particular service Daemon.<br/>
+   * 
+   * @return proxy to connect to a particular service Daemon.
+   */
+  protected abstract PROXY getProxy();
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public String getHostName() {
+    return process.getHostName();
+  }
+
+  public boolean isReady() throws IOException {
+    return getProxy().isReady();
+  }
+
+  public void kill() throws IOException {
+    process.kill();
+  }
+
+  public void ping() throws IOException {
+    getProxy().ping();
+  }
+
+  public void start() throws IOException {
+    process.start();
+  }
+
+  public ProcessInfo getProcessInfo() throws IOException {
+    return getProxy().getProcessInfo();
+  }
+
+  public void enable(List<Enum<?>> faults) throws IOException {
+    getProxy().enable(faults);
+  }
+
+  public void disableAll() throws IOException {
+    getProxy().disableAll();
+  }
+
+  public FileStatus getFileStatus(String path, boolean local) throws IOException {
+    return getProxy().getFileStatus(path, local);
+  }
+
+  public FileStatus[] listStatus(String path, boolean local) 
+    throws IOException {
+    return getProxy().listStatus(path, local);
+  }
+
+  public FileStatus[] listStatus(String f, boolean local, boolean recursive) 
+    throws IOException {
+    List<FileStatus> status = new ArrayList<FileStatus>();
+    addStatus(status, f, local, recursive);
+    return status.toArray(new FileStatus[0]);
+  }
+
+  private void addStatus(List<FileStatus> status, String f, 
+      boolean local, boolean recursive) 
+    throws IOException {
+    FileStatus[] fs = listStatus(f, local);
+    if (fs != null) {
+      for (FileStatus fileStatus : fs) {
+        if (!f.equals(fileStatus.getPath().toString())) {
+          status.add(fileStatus);
+          if (recursive) {
+            addStatus(status, fileStatus.getPath().toString(), local, recursive);
+          }
+        }
+      }
+    }
+  }
+}

+ 323 - 0
src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java

@@ -0,0 +1,323 @@
+package org.apache.hadoop.test.system;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.system.process.ClusterProcessManager;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+
+/**
+ * Abstract class which Represents a cluster, which contains a single master and
+ * one or more slave.<br/>
+ * 
+ * @param Master
+ *          daemon client type.
+ * @param Slave
+ *          daemon client type.
+ */
+public abstract class AbstractMasterSlaveCluster
+    <MASTER extends AbstractDaemonClient, SLAVE extends AbstractDaemonClient> {
+
+  public static final String WAITFORMASTERKEY = 
+    "test.system.abstractmasterslavecluster.waitformaster";
+  
+  private static final Log LOG = 
+    LogFactory.getLog(AbstractMasterSlaveCluster.class);
+
+  private Configuration conf;
+  protected ClusterProcessManager clusterManager;
+  private MASTER master;
+  private Map<String, SLAVE> slaves = new HashMap<String, SLAVE>();
+  private boolean waitformaster = false;
+
+  /**
+   * Constructor to create a master slave cluster.<br/>
+   * 
+   * @param conf
+   *          Configuration to be used while constructing the cluster.
+   * @param rcluster
+   *          process manger instance to be used for managing the daemons.
+   * 
+   * @throws IOException
+   */
+  public AbstractMasterSlaveCluster(Configuration conf,
+      ClusterProcessManager rcluster) throws IOException {
+    this.conf = conf;
+    this.clusterManager = rcluster;
+    this.master = createMaster(clusterManager.getMaster());
+    Iterator<Map.Entry<String, RemoteProcess>> it = clusterManager.getSlaves()
+        .entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<String, RemoteProcess> entry = it.next();
+      slaves.put(entry.getKey(), createSlave(entry.getValue()));
+    }
+    this.waitformaster = conf.getBoolean(WAITFORMASTERKEY, true);
+  }
+
+  /**
+   * Method to create the master daemon client.<br/>
+   * 
+   * @param remoteprocess
+   *          to manage the master daemon.
+   * @return instance of the daemon client of master daemon.
+   * 
+   * @throws IOException
+   */
+  protected abstract MASTER createMaster(RemoteProcess masterDaemon)
+      throws IOException;
+
+  /**
+   * Method to create the slave daemons clients.<br/>
+   * 
+   * @param remoteprocess
+   *          to manage the slave daemons.
+   * @return instance of the daemon clients of slave daemons.
+   * 
+   * @throws IOException
+   */
+  protected abstract SLAVE createSlave(RemoteProcess slaveDaemon)
+      throws IOException;
+
+  /**
+   * Get the global cluster configuration which was used to create the 
+   * cluster. <br/>
+   * 
+   * @return global configuration of the cluster.
+   */
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Return the client handle of the master Daemon.<br/>
+   * 
+   * @return master daemon client handle.
+   */
+  public MASTER getMaster() {
+    return master;
+  }
+
+  /**
+   * Return the client handle of the slave Daemons.<br/>
+   * 
+   * @return map of host to slave daemon clients.
+   */
+  public Map<String, SLAVE> getSlaves() {
+    return slaves;
+  }
+
+  /**
+   * Checks if the master slave cluster is ready for testing. <br/>
+   * Algorithm for checking is as follows : <br/>
+   * <ul>
+   * <li> Wait for Daemon to come up </li>
+   * <li> Check if daemon is ready </li>
+   * <li> If one of the daemon is not ready, return false </li>
+   * </ul> 
+   * 
+   * @return true if whole cluster is ready.
+   * 
+   * @throws IOException
+   */
+  public boolean isReady() throws IOException {
+    LOG.info("Check if master is up and running");
+    waitForDaemon(master);
+    if (!master.isReady()) {
+      return false;
+    }
+    LOG.info("Check if slaves are up and running");
+    for (SLAVE slave : slaves.values()) {
+      waitForDaemon(slave);
+      if (!slave.isReady()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private void waitForDaemon(AbstractDaemonClient d) {
+    while(true) {
+      try {
+        LOG.info("Waiting for daemon in host to come up : " + d.getHostName());
+        d.connect();
+        break;
+      } catch (IOException e) {
+        try {
+          Thread.sleep(10000);
+        } catch (InterruptedException ie) {
+        }
+      }
+    }
+  }
+
+  /**
+   * Start the master slave cluster. <br/>
+   * The startup behavior is controlled by the {@code WAITFORMASTERKEY}.
+   * <ul>
+   * <li>If{@code WAITFORMASTERKEY} is set to true then start up of slaves are
+   * done after master daemon comes up and is ready to accept the RPC connection
+   * </li>
+   * <li>Else the daemons are started up sequentially without waiting for master
+   * daemon to be ready.</li>
+   * </ul>
+   * 
+   * @throws IOException
+   */
+  public void start() throws IOException {
+    if (waitformaster) {
+      this.master.start();
+      waitForMaster();
+      startSlaves();
+    } else {
+      clusterManager.start();
+    }
+  }
+
+  private void waitForMaster() throws IOException {
+    waitForDaemon(master);
+    while (!master.isReady()) {
+      try {
+        LOG.info("Waiting for master daemon to be ready to accept " +
+        		"RPC connection");
+        Thread.sleep(10000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  private void startSlaves() throws IOException {
+    Map<String, RemoteProcess> slaves = clusterManager.getSlaves();
+    for (RemoteProcess s : slaves.values()) {
+      s.start();
+    }
+  }
+
+  /**
+   * Stops the master slave cluster.<br/>
+   * 
+   * @throws IOException
+   */
+  public void stop() throws IOException {
+    clusterManager.stop();
+  }
+
+  /**
+   * Connect to master and slave RPC ports.
+   * @throws IOException
+   */
+  public void connect() throws IOException {
+    LOG.info("Connecting to the cluster..." + getClass().getName());
+    master.connect();
+    for (SLAVE slave : slaves.values()) {
+      slave.connect();
+    }
+  }
+
+  /**
+   * Disconnect to master and slave RPC ports.
+   * @throws IOException
+   */
+  public void disconnect() throws IOException {
+    LOG.info("Disconnecting to the cluster..." + 
+        getClass().getName());
+    master.disconnect();
+    for (SLAVE slave : slaves.values()) {
+      slave.disconnect();
+    }
+    LOG.info("Disconnected!!");
+  }
+
+  /**
+   * Enable/Inject the faults. In case fault can't be enabled on ALL nodes
+   * cluster is restarted.
+   */
+  public void enable(List<Enum<?>> faults) throws IOException {
+    try {
+      enableFaults(faults);
+    } catch (IOException e) {
+      stop();
+      start();
+      enableFaults(faults);
+    }
+  }
+
+  /**
+   * Disable/Remove the all the faults. In case fault can't be disabled on ALL
+   * nodes cluster is restarted.
+   */
+  public void disableAllFaults() throws IOException {
+    try {
+      disableFaults();
+    } catch (IOException e) {
+      stop();
+      start();
+      disableFaults();
+    }
+  }
+
+  private void enableFaults(List<Enum<?>> faults) throws IOException {
+    master.enable(faults);
+    for (SLAVE slave : slaves.values()) {
+      slave.enable(faults);
+    }
+  }
+
+  private void disableFaults() throws IOException {
+    master.disableAll();
+    for (SLAVE slave : slaves.values()) {
+      slave.disableAll();
+    }
+  }
+
+  /**
+   * Ping all the daemons of the cluster.
+   * @throws IOException
+   */
+  public void ping() throws IOException {
+    MASTER master = getMaster();
+    LOG.info("Master is :" + master.getHostName() + " pinging ...");
+    master.ping();
+    Collection<SLAVE> slaves = getSlaves().values();
+    for (SLAVE slave : slaves) {
+      LOG.info("Slave is : " + slave.getHostName() + " pinging....");
+      slave.ping();
+    }
+  }
+
+  /**
+   * Connect to the cluster and ensure that it is clean to run tests.
+   * @throws Exception
+   */
+  public void setUp() throws Exception {
+    while (!isReady()) {
+      Thread.sleep(1000);
+    }
+    connect();
+    ping();
+    ensureClean();
+  }
+
+  /**
+   * Ensure that the cluster is clean to run tests.
+   * @throws IOException
+   */
+  public void ensureClean() throws IOException {
+  }
+
+  /**
+   * Ensure that cluster is clean. Disconnect from the RPC ports of the daemons.
+   * @throws IOException
+   */
+  public void tearDown() throws IOException {
+    ensureClean();
+    disconnect();
+  }
+}

+ 90 - 0
src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java

@@ -0,0 +1,90 @@
+package org.apache.hadoop.test.system;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/**
+ * RPC interface of a given Daemon.
+ */
+public interface DaemonProtocol extends VersionedProtocol{
+  long versionID = 1L;
+
+  /**
+   * Returns the Daemon configuration.
+   * @return Configuration
+   * @throws IOException
+   */
+  Configuration getDaemonConf() throws IOException;
+
+  /**
+   * Check if the Daemon is alive.
+   * 
+   * @throws IOException
+   *           if Daemon is unreachable.
+   */
+  void ping() throws IOException;
+
+  /**
+   * Check if the Daemon is ready to accept RPC connections.
+   * 
+   * @return true if Daemon is ready to accept RPC connection.
+   * @throws IOException
+   */
+  boolean isReady() throws IOException;
+
+  /**
+   * Get system level view of the Daemon process.
+   * 
+   * @return returns system level view of the Daemon process.
+   * 
+   * @throws IOException
+   */
+  ProcessInfo getProcessInfo() throws IOException;
+
+  /**
+   * Enable the set of specified faults in the Daemon.<br/>
+   * 
+   * @param faults
+   *          list of faults to be enabled.
+   * 
+   * @throws IOException
+   */
+  void enable(List<Enum<?>> faults) throws IOException;
+
+  /**
+   * Disable all the faults which are enabled in the Daemon. <br/>
+   * 
+   * @throws IOException
+   */
+  void disableAll() throws IOException;
+
+  /**
+   * Return a file status object that represents the path.
+   * @param path
+   *          given path
+   * @param local
+   *          whether the path is local or not
+   * @return a FileStatus object
+   * @throws FileNotFoundException when the path does not exist;
+   *         IOException see specific implementation
+   */
+  FileStatus getFileStatus(String path, boolean local) throws IOException;
+
+  /**
+   * List the statuses of the files/directories in the given path if the path is
+   * a directory.
+   * 
+   * @param path
+   *          given path
+   * @param local
+   *          whether the path is local or not
+   * @return the statuses of the files/directories in the given patch
+   * @throws IOException
+   */
+  FileStatus[] listStatus(String path, boolean local) throws IOException;
+}

+ 59 - 0
src/test/system/java/org/apache/hadoop/test/system/ProcessInfo.java

@@ -0,0 +1,59 @@
+package org.apache.hadoop.test.system;
+
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Daemon system level process information.
+ */
+public interface ProcessInfo extends Writable {
+  /**
+   * Get the current time in the millisecond.<br/>
+   * 
+   * @return current time on daemon clock in millisecond.
+   */
+  public long currentTimeMillis();
+
+  /**
+   * Get the environment that was used to start the Daemon process.<br/>
+   * 
+   * @return the environment variable list.
+   */
+  public Map<String,String> getEnv();
+
+  /**
+   * Get the System properties of the Daemon process.<br/>
+   * 
+   * @return the properties list.
+   */
+  public Map<String,String> getSystemProperties();
+
+  /**
+   * Get the number of active threads in Daemon VM.<br/>
+   * 
+   * @return number of active threads in Daemon VM.
+   */
+  public int activeThreadCount();
+
+  /**
+   * Get the maximum heap size that is configured for the Daemon VM. <br/>
+   * 
+   * @return maximum heap size.
+   */
+  public long maxMemory();
+
+  /**
+   * Get the free memory in Daemon VM.<br/>
+   * 
+   * @return free memory.
+   */
+  public long freeMemory();
+
+  /**
+   * Get the total used memory in Demon VM. <br/>
+   * 
+   * @return total used memory.
+   */
+  public long totalMemory();
+}

+ 141 - 0
src/test/system/java/org/apache/hadoop/test/system/ProcessInfoImpl.java

@@ -0,0 +1,141 @@
+package org.apache.hadoop.test.system;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class ProcessInfoImpl implements ProcessInfo {
+
+  private int threadCount;
+  private long currentTime;
+  private long freemem;
+  private long maxmem;
+  private long totmem;
+  private Map<String, String> env;
+  private Map<String, String> props;
+
+  public ProcessInfoImpl() {
+    env = new HashMap<String, String>();
+    props = new HashMap<String, String>();
+  }
+
+  /**
+   * Construct a concrete process information object. <br/>
+   * 
+   * @param threadCount
+   *          count of threads.
+   * @param currentTime
+   * @param freememory
+   * @param maximummemory
+   * @param totalmemory
+   * @param env
+   *          environment list.
+   */
+  public ProcessInfoImpl(int threadCount, long currentTime, long freemem,
+      long maxmem, long totmem, Map<String, String> env, 
+      Map<String, String> props) {
+    this.threadCount = threadCount;
+    this.currentTime = currentTime;
+    this.freemem = freemem;
+    this.maxmem = maxmem;
+    this.totmem = totmem;
+    this.env = env;
+    this.props = props;
+  }
+
+  @Override
+  public int activeThreadCount() {
+    return threadCount;
+  }
+
+  @Override
+  public long currentTimeMillis() {
+    return currentTime;
+  }
+
+  @Override
+  public long freeMemory() {
+    return freemem;
+  }
+
+  @Override
+  public Map<String, String> getEnv() {
+    return env;
+  }
+
+  @Override
+  public Map<String,String> getSystemProperties() {
+    return props;
+  }
+
+  @Override
+  public long maxMemory() {
+    return maxmem;
+  }
+
+  @Override
+  public long totalMemory() {
+    return totmem;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.threadCount = in.readInt();
+    this.currentTime = in.readLong();
+    this.freemem = in.readLong();
+    this.maxmem = in.readLong();
+    this.totmem = in.readLong();
+    read(in, env);
+    read(in, props);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(threadCount);
+    out.writeLong(currentTime);
+    out.writeLong(freemem);
+    out.writeLong(maxmem);
+    out.writeLong(totmem);
+    write(out, env);
+    write(out, props);
+  }
+
+  private void read(DataInput in, Map<String, String> map) throws IOException {
+    int size = in.readInt();
+    for (int i = 0; i < size; i = i + 2) {
+      String key = in.readUTF();
+      String value = in.readUTF();
+      map.put(key, value);
+    }
+  }
+
+  private void write(DataOutput out, Map<String, String> map) 
+  throws IOException {
+    int size = (map.size() * 2);
+    out.writeInt(size);
+    for (Map.Entry<String, String> entry : map.entrySet()) {
+      out.writeUTF(entry.getKey());
+      out.writeUTF(entry.getValue());
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuffer strBuf = new StringBuffer();
+    strBuf.append(String.format("active threads : %d\n", threadCount));
+    strBuf.append(String.format("current time  : %d\n", currentTime));
+    strBuf.append(String.format("free memory  : %d\n", freemem));
+    strBuf.append(String.format("total memory  : %d\n", totmem));
+    strBuf.append(String.format("max memory  : %d\n", maxmem));
+    strBuf.append("Environment Variables : \n");
+    for (Map.Entry<String, String> entry : env.entrySet()) {
+      strBuf.append(String.format("key : %s value : %s \n", entry.getKey(),
+          entry.getValue()));
+    }
+    return strBuf.toString();
+  }
+
+}

+ 71 - 0
src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java

@@ -0,0 +1,71 @@
+package org.apache.hadoop.test.system.process;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Interface to manage the remote processes in the master-slave cluster.
+ */
+public interface ClusterProcessManager {
+
+  /**
+   * The configuration key to specify the concrete implementation of the
+   * {@link ClusterProcessManager} to be used by
+   * {@link ClusterProcessManagerFactory}.
+   */
+  String IMPL_CLASS = "test.system.clusterprocessmanager.impl.class";
+
+  /**
+   * Enumeration used to specify the types of the clusters which are supported
+   * by the concrete implementations of {@link ClusterProcessManager}.
+   */
+  public enum ClusterType {
+    MAPRED, HDFS
+  }
+  
+  /**
+   * Initialization method to set cluster type and also pass the configuration
+   * object which is required by the ClusterProcessManager to manage the 
+   * cluster.<br/>
+   * Configuration object should typically contain all the parameters which are 
+   * required by the implementations.<br/>
+   *  
+   * @param t type of the cluster to be managed.
+   * @param conf configuration containing values of the specific keys which 
+   * are required by the implementation of the cluster process manger.
+   * 
+   * @throws Exception when initialization fails.
+   */
+  void init(ClusterType t, Configuration conf) throws Exception;
+
+  /**
+   * Getter for master daemon process for managing the master daemon.<br/>
+   * 
+   * @return master daemon process.
+   */
+  RemoteProcess getMaster();
+
+  /**
+   * Getter for slave daemon process for managing the slaves.<br/>
+   * 
+   * @return map of slave hosts to slave daemon process.
+   */
+  Map<String, RemoteProcess> getSlaves();
+
+  /**
+   * Method to start the cluster including all master and slaves.<br/>
+   * 
+   * @throws IOException if startup procedure fails.
+   */
+  void start() throws IOException;
+
+  /**
+   * Method to shutdown all the master and slaves.<br/>
+   * 
+   * @throws IOException if shutdown procedure fails.
+   */
+  void stop() throws IOException;
+
+}

+ 35 - 0
src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManagerFactory.java

@@ -0,0 +1,35 @@
+package org.apache.hadoop.test.system.process;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.system.process.ClusterProcessManager.ClusterType;
+
+/**
+ * Factory to create ClusterProcessManager handle.
+ */
+public class ClusterProcessManagerFactory {
+
+  /**
+   * Factory method to create the {@link ClusterProcessManager} based on the
+   * {@code ClusterProcessManager.IMPL_CLASS} value. <br/>
+   * 
+   * @param t type of the cluster to be managed by the instance.
+   * @param conf the configuration required by the instance for 
+   * management of cluster.
+   * @return instance of the cluster to be used for management.
+   * 
+   * @throws Exception
+   */
+  public static ClusterProcessManager createInstance(ClusterType t,
+      Configuration conf) throws Exception {
+    String implKlass = conf.get(ClusterProcessManager.IMPL_CLASS, System
+        .getProperty(ClusterProcessManager.IMPL_CLASS));
+    if (implKlass == null || implKlass.isEmpty()) {
+      implKlass = HadoopDaemonRemoteCluster.class.getName();
+    }
+    Class<ClusterProcessManager> klass = (Class<ClusterProcessManager>) Class
+        .forName(implKlass);
+    ClusterProcessManager k = klass.newInstance();
+    k.init(t, conf);
+    return k;
+  }
+}

+ 274 - 0
src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java

@@ -0,0 +1,274 @@
+package org.apache.hadoop.test.system.process;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/**
+ * The concrete class which implements the start up and shut down based routines
+ * based on the hadoop-daemon.sh. <br/>
+ * 
+ * Class requires two keys to be present in the Configuration objects passed to
+ * it. Look at <code>CONF_HADOOPHOME</code> and
+ * <code>CONF_HADOOPCONFDIR</code> for the names of the
+ * configuration keys.
+ * 
+ * Following will be the format which the final command execution would look : 
+ * <br/>
+ * <code>
+ *  ssh master-host 'hadoop-home/bin/hadoop-daemon.sh --script scriptName 
+ *  --config HADOOP_CONF_DIR (start|stop) masterCommand'
+ * </code>
+ */
+public class HadoopDaemonRemoteCluster implements ClusterProcessManager {
+
+  private static final Log LOG = LogFactory
+      .getLog(HadoopDaemonRemoteCluster.class.getName());
+
+  /**
+   * Key used to configure the HADOOP_HOME to be used by the
+   * HadoopDaemonRemoteCluster.
+   */
+  public final static String CONF_HADOOPHOME = "test.system.hdrc.hadoophome";
+  /**
+   * Key used to configure the HADOOP_CONF_DIR to be used by the
+   * HadoopDaemonRemoteCluster.
+   */
+  public final static String CONF_HADOOPCONFDIR = 
+    "test.system.hdrc.hadoopconfdir";
+
+  public final static String CONF_DEPLOYED_HADOOPCONFDIR =
+    "test.system.hdrc.deployed.hadoopconfdir";
+
+  private String hadoopHome;
+  private String hadoopConfDir;
+  private String deployed_hadoopConfDir;
+  private String masterCommand;
+  private String slaveCommand;
+
+  private RemoteProcess master;
+  private Map<String, RemoteProcess> slaves;
+
+  @Override
+  public void init(ClusterType t, Configuration conf) throws Exception {
+    /*
+     * Initialization strategy of the HadoopDaemonRemoteCluster is three staged
+     * process: 1. Populate script names based on the type of passed cluster. 2.
+     * Populate the required directories. 3. Populate the master and slaves.
+     */
+    populateScriptNames(t);
+    populateDirectories(conf);
+    this.slaves = new HashMap<String, RemoteProcess>();
+    populateDaemons(deployed_hadoopConfDir);
+  }
+
+  /**
+   * Method to populate the required master and slave commands which are used to
+   * manage the cluster.<br/>
+   * 
+   * @param t
+   *          type of cluster to be initialized.
+   * 
+   * @throws UnsupportedOperationException
+   *           if the passed cluster type is not MAPRED or HDFS
+   */
+  private void populateScriptNames(ClusterType t) {
+    switch (t) {
+    case MAPRED:
+      masterCommand = "jobtracker";
+      slaveCommand = "tasktracker";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Created mapred hadoop daemon remote cluster manager with "
+            + "scriptName: mapred, masterCommand: jobtracker, "
+            + "slaveCommand: tasktracker");
+      }
+      break;
+    case HDFS:
+      masterCommand = "namenode";
+      slaveCommand = "datanode";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Created hdfs hadoop daemon remote cluster manager with "
+            + "scriptName: hdfs, masterCommand: namenode, "
+            + "slaveCommand: datanode");
+      }
+      break;
+    default:
+      LOG.error("Cluster type :" + t
+          + "is not supported currently by HadoopDaemonRemoteCluster");
+      throw new UnsupportedOperationException(
+          "The specified cluster type is not supported by the " +
+          "HadoopDaemonRemoteCluster");
+    }
+  }
+
+  /**
+   * Method to populate the hadoop home and hadoop configuration directories.
+   * 
+   * @param conf
+   *          Configuration object containing values for
+   *          TEST_SYSTEM_HADOOPHOME_CONF_KEY and
+   *          TEST_SYSTEM_HADOOPCONFDIR_CONF_KEY
+   * 
+   * @throws IllegalArgumentException
+   *           if the configuration or system property set does not contain
+   *           values for the required keys.
+   */
+  private void populateDirectories(Configuration conf) {
+    hadoopHome = conf.get(CONF_HADOOPHOME, System
+        .getProperty(CONF_HADOOPHOME));
+    hadoopConfDir = conf.get(CONF_HADOOPCONFDIR, System
+        .getProperty(CONF_HADOOPCONFDIR));
+
+    deployed_hadoopConfDir = conf.get(CONF_DEPLOYED_HADOOPCONFDIR,
+      System.getProperty(CONF_DEPLOYED_HADOOPCONFDIR));
+    if (deployed_hadoopConfDir == null || deployed_hadoopConfDir.isEmpty()) {
+      deployed_hadoopConfDir = hadoopConfDir;
+    }
+
+    if (hadoopHome == null || hadoopConfDir == null || hadoopHome.isEmpty()
+        || hadoopConfDir.isEmpty()) {
+      LOG.error("No configuration "
+          + "for the HADOOP_HOME and HADOOP_CONF_DIR passed");
+      throw new IllegalArgumentException(
+          "No Configuration passed for hadoop home " +
+          "and hadoop conf directories");
+    }
+
+  }
+
+  @Override
+  public RemoteProcess getMaster() {
+    return master;
+  }
+
+  @Override
+  public Map<String, RemoteProcess> getSlaves() {
+    return slaves;
+  }
+
+  @Override
+  public void start() throws IOException {
+    // start master first.
+    master.start();
+    for (RemoteProcess slave : slaves.values()) {
+      slave.start();
+    }
+  }
+
+  @Override
+  public void stop() throws IOException {
+    master.kill();
+    for (RemoteProcess slave : slaves.values()) {
+      slave.kill();
+    }
+  }
+
+  private void populateDaemons(String confLocation) throws IOException {
+    File mastersFile = new File(confLocation, "masters");
+    File slavesFile = new File(confLocation, "slaves");
+    BufferedReader reader = null;
+    try {
+      reader = new BufferedReader(new FileReader(mastersFile));
+      String masterHost = null;
+      masterHost = reader.readLine();
+      if (masterHost != null && !masterHost.trim().isEmpty()) {
+        master = new ScriptDaemon(masterCommand, masterHost);
+      }
+    } finally {
+      try {
+        reader.close();
+      } catch (Exception e) {
+        LOG.error("Can't read masters file from " + confLocation);
+      }
+
+    }
+    try {
+      reader = new BufferedReader(new FileReader(slavesFile));
+      String slaveHost = null;
+      while ((slaveHost = reader.readLine()) != null) {
+        RemoteProcess slave = new ScriptDaemon(slaveCommand, slaveHost);
+        slaves.put(slaveHost, slave);
+      }
+    } finally {
+      try {
+        reader.close();
+      } catch (Exception e) {
+        LOG.error("Can't read slaves file from " + confLocation);
+      }
+    }
+  }
+
+  /**
+   * The core daemon class which actually implements the remote process
+   * management of actual daemon processes in the cluster.
+   * 
+   */
+  class ScriptDaemon implements RemoteProcess {
+
+    private static final String STOP_COMMAND = "stop";
+    private static final String START_COMMAND = "start";
+    private static final String SCRIPT_NAME = "hadoop-daemon.sh";
+    private final String daemonName;
+    private final String hostName;
+
+    public ScriptDaemon(String daemonName, String hostName) {
+      this.daemonName = daemonName;
+      this.hostName = hostName;
+    }
+
+    @Override
+    public String getHostName() {
+      return hostName;
+    }
+
+    private ShellCommandExecutor buildCommandExecutor(String command) {
+      String[] commandArgs = getCommand(command);
+      File binDir = getBinDir();
+      HashMap<String, String> env = new HashMap<String, String>();
+      env.put("HADOOP_CONF_DIR", hadoopConfDir);
+      ShellCommandExecutor executor = new ShellCommandExecutor(commandArgs,
+          binDir, env);
+      LOG.info(executor.toString());
+      return executor;
+    }
+
+    private File getBinDir() {
+      File binDir = new File(hadoopHome, "bin");
+      return binDir;
+    }
+
+    private String[] getCommand(String command) {
+      ArrayList<String> cmdArgs = new ArrayList<String>();
+      File binDir = getBinDir();
+      cmdArgs.add("ssh");
+      cmdArgs.add(hostName);
+      cmdArgs.add(binDir.getAbsolutePath() + File.separator + SCRIPT_NAME);
+      cmdArgs.add("--config");
+      cmdArgs.add(hadoopConfDir);
+      // XXX Twenty internal version does not support --script option.
+      cmdArgs.add(command);
+      cmdArgs.add(daemonName);
+      return (String[]) cmdArgs.toArray(new String[cmdArgs.size()]);
+    }
+
+    @Override
+    public void kill() throws IOException {
+      buildCommandExecutor(STOP_COMMAND).execute();
+    }
+
+    @Override
+    public void start() throws IOException {
+      buildCommandExecutor(START_COMMAND).execute();
+    }
+  }
+
+}

+ 29 - 0
src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java

@@ -0,0 +1,29 @@
+package org.apache.hadoop.test.system.process;
+
+import java.io.IOException;
+
+/**
+ * Interface to manage the remote process.
+ */
+public interface RemoteProcess {
+  /**
+   * Get the host on which the daemon process is running/stopped.<br/>
+   * 
+   * @return hostname on which process is running/stopped.
+   */
+  String getHostName();
+
+  /**
+   * Start a given daemon process.<br/>
+   * 
+   * @throws IOException if startup fails.
+   */
+  void start() throws IOException;
+
+  /**
+   * Stop a given daemon process.<br/>
+   * 
+   * @throws IOException if shutdown fails.
+   */
+  void kill() throws IOException;
+}

+ 83 - 0
src/test/testjar/UserNamePermission.java

@@ -0,0 +1,83 @@
+package testjar;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public  class UserNamePermission      
+{
+
+  private static final Log LOG = LogFactory.getLog(UserNamePermission.class);
+  //This mapper will read the user name and pass in to the reducer
+  public static class UserNameMapper extends Mapper<LongWritable,Text,Text,Text>
+  {
+    Text key1 = new Text("UserName");
+    public void map(LongWritable key, Text value, Context context)
+      throws IOException,InterruptedException {
+      Text val = new Text(System.getProperty("user.name").toString());
+      context.write(key1, val);
+    }
+  }
+
+  //The reducer is responsible for writing the user name to the file
+  //which will be validated by the testcase
+  public static class UserNameReducer extends Reducer<Text,Text,Text,Text>
+  {
+    public void reduce(Text key, Iterator<Text> values,
+      Context context) throws IOException,InterruptedException {
+	  			
+      LOG.info("The key "+key);
+      if(values.hasNext())
+      {
+        Text val = values.next();
+        LOG.info("The value  "+val);
+	  				 
+        context.write(key,new Text(System.getProperty("user.name")));
+	  }
+	  				  			 
+	}
+  }
+		
+  public static void main(String [] args) throws Exception
+  {
+    Path outDir = new Path("output");
+    Configuration conf = new Configuration();
+    Job job = new Job(conf, "user name check"); 
+			
+			
+    job.setJarByClass(UserNamePermission.class);
+    job.setMapperClass(UserNamePermission.UserNameMapper.class);
+    job.setCombinerClass(UserNamePermission.UserNameReducer.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setReducerClass(UserNamePermission.UserNameReducer.class);
+    job.setNumReduceTasks(1);
+		    
+    job.setInputFormatClass(TextInputFormat.class);
+    TextInputFormat.addInputPath(job, new Path("input"));
+    FileOutputFormat.setOutputPath(job, outDir);
+		    
+    System.exit(job.waitForCompletion(true) ? 0 : 1);
+  }
+
+}
+
+
+