Browse Source

commit c425a4629b07b40021521266fb43997c85208ae4
Author: Balaji Rajagopalan <balajirg@yahoo-inc.com>
Date: Tue Jul 6 10:17:53 2010 -0700

MAPREDUCE-1758 from https://issues.apache.org/jira/secure/attachment/12448550/bb_patch_2.txt


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077530 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 years ago
parent
commit
a0bd1ff132

+ 4 - 14
src/test/system/java/org/apache/hadoop/mapred/TestChildsKillingOfMemoryExceedsTask.java

@@ -290,11 +290,8 @@ public class TestChildsKillingOfMemoryExceedsTask {
     Assert.assertTrue("Map process is not alive before task fails.", 
         ttIns.isProcessTreeAlive(pid));
 
-    while (ttIns.getTask(tID).getTaskStatus().getRunState() 
-        == TaskStatus.State.RUNNING) {
-      UtilsForTests.waitFor(1000);
-      ttIns = ttClientIns.getProxy();
-    }
+    Assert.assertTrue("Task did not stop " + tID, 
+        ttClientIns.isTaskStopped(tID));
 
     String[] taskDiagnostics = runJob.getTaskDiagnostics(tAttID);
     Assert.assertNotNull("Task diagnostics is null.", taskDiagnostics);
@@ -307,15 +304,8 @@ public class TestChildsKillingOfMemoryExceedsTask {
 
     LOG.info("Waiting till the job is completed...");
     counter = 0;
-    while (counter < 60) {
-      if (jInfo.getStatus().isJobComplete()) {
-        break;
-      }
-      UtilsForTests.waitFor(1000);
-      jInfo = wovenClient.getJobInfo(id);
-      counter ++;
-    }
-    Assert.assertTrue("Job has not been completed...", counter != 60);
+    Assert.assertTrue("Job has not been completed...", 
+        cluster.getJTClient().isJobStopped(id));
     ttIns = ttClientIns.getProxy();
     ttIns.sendAction(action);
     UtilsForTests.waitFor(1000);

+ 1 - 8
src/test/system/java/org/apache/hadoop/mapred/TestDistributedCacheModifiedFile.java

@@ -297,14 +297,7 @@ public class TestDistributedCacheModifiedFile {
         }
       }
       //Allow the job to continue through MR control job.
-      for (TaskInfo taskInfoRemaining : taskInfos) {
-        FinishTaskControlAction action = new FinishTaskControlAction(TaskID
-           .downgrade(taskInfoRemaining.getTaskID()));
-        Collection<TTClient> tts = cluster.getTTClients();
-        for (TTClient cli : tts) {
-          cli.getProxy().sendAction(action);
-        }
-      }
+      cluster.signalAllTasks(rJob.getID());
 
       //Killing the job because all the verification needed
       //for this testcase is completed.

+ 5 - 5
src/test/system/java/org/apache/hadoop/mapred/TestJobKill.java

@@ -136,12 +136,12 @@ public class TestJobKill {
       RunningJob rJob = cluster.getJTClient().getClient().submitJob(jconf);
       JobInfo info = wovenClient.getJobInfo(rJob.getID());
       Assert.assertNotNull("Job Info is null",info);
-      JobID id = rJob.getID();
-      while (info.runningMaps() != 1) {
-        Thread.sleep(1000);
-        info = wovenClient.getJobInfo(id);
-      }
+      JobID id = rJob.getID();      
+      Assert.assertTrue("Failed to start the job",
+          cluster.getJTClient().isJobStarted(id));
       rJob.killJob();
+      Assert.assertTrue("Failed to kill the job",
+          cluster.getJTClient().isJobStopped(id));
     }
     checkCleanup(jconf);
     deleteOutputDir();

+ 22 - 0
src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java

@@ -328,6 +328,28 @@ public class JTClient extends MRDaemonClient<JTProtocol> {
     LOG.info("Verified the job history for the jobId : " + jobId);
   }
 
+  /**
+   * The method provides the information on the job has stopped or not
+   * @return indicates true if the job has stopped false otherwise.
+   * @param job id has the information of the running job.
+   * @throw IOException is thrown if the job info cannot be fetched.   
+   */
+  public boolean isJobStopped(JobID id) throws IOException{
+    int counter = 0;
+    JobInfo jInfo = getProxy().getJobInfo(id);
+    if(jInfo != null ) {
+      while (counter < 60) {
+        if (jInfo.getStatus().isJobComplete()) {
+          break;
+        }
+        UtilsForTests.waitFor(1000);
+        jInfo = getProxy().getJobInfo(id);
+        counter ++;
+      }
+    }
+    return (counter != 60)? true : false;
+  }
+
   /**
    * It uses to check whether job is started or not.
    * @param id job id

+ 23 - 0
src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java

@@ -34,6 +34,9 @@ import org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster;
 import org.apache.hadoop.test.system.process.MultiUserHadoopDaemonRemoteCluster;
 import org.apache.hadoop.test.system.process.RemoteProcess;
 import org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster.HadoopDaemonInfo;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskID;
+import java.util.Collection;
 
 /**
  * Concrete AbstractDaemonCluster representing a Map-Reduce cluster.
@@ -143,6 +146,26 @@ public class MRCluster extends AbstractDaemonCluster {
           org.apache.hadoop.mapred.JobID.downgrade(job.getID()));
     }
   }
+  /**
+    * Allow the job to continue through MR control job.
+    * @param id of the job. 
+    * @throws IOException when failed to get task info. 
+    */
+  public void signalAllTasks(JobID id) throws IOException{
+    TaskInfo[] taskInfos = getJTClient().getProxy().getTaskInfo(id);
+    if(taskInfos !=null) {
+      for (TaskInfo taskInfoRemaining : taskInfos) {
+        if(taskInfoRemaining != null) {
+          FinishTaskControlAction action = new FinishTaskControlAction(TaskID
+              .downgrade(taskInfoRemaining.getTaskID()));
+          Collection<TTClient> tts = getTTClients();
+          for (TTClient cli : tts) {
+            cli.getProxy().sendAction(action);
+          }
+        }
+      }  
+    }
+  }
 
   @Override
   protected AbstractDaemonClient createClient(

+ 27 - 0
src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java

@@ -27,6 +27,9 @@ import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapred.TaskTrackerStatus;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.test.system.process.RemoteProcess;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapred.UtilsForTests;
 
 /**
  * TaskTracker client for system tests. Assumption of the class is that the
@@ -86,5 +89,29 @@ public class TTClient extends MRDaemonClient<TTProtocol> {
   public TaskTrackerStatus getStatus() throws IOException {
     return getProxy().getStatus();
   }
+  
+  /**
+   * This methods provides the information on the particular task managed
+   * by a task tracker has stopped or not. 
+   * @param TaskID is id of the task to get the status.
+   * @throws IOException if there is an error. 
+   * @return true is stopped. 
+   */
+  public boolean isTaskStopped(TaskID tID) throws IOException {
+    int counter = 0;
+    if(tID != null && proxy.getTask(tID) != null) {
+      TaskStatus.State tState= proxy.getTask(tID).getTaskStatus().getRunState();
+      while ( counter < 60) {
+        if(tState != TaskStatus.State.RUNNING && 
+            tState != TaskStatus.State.UNASSIGNED) {
+          break;
+        }
+        UtilsForTests.waitFor(1000);
+        tState= proxy.getTask(tID).getTaskStatus().getRunState();
+        counter++;
+      }      
+    }
+    return (counter != 60)? true : false;
+  }
 
 }