소스 검색

MAPREDUCE-7262. MRApp helpers block for long intervals (500ms)

Signed-off-by: Jonathan Eagles <jeagles@gmail.com>
(cherry picked from commit 3f01c481060585ccd37be9db8aa4d1e33d2e2d6b)
Ahmed Hussein 5 년 전
부모
커밋
1f315c28d9

+ 31 - 52
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

@@ -113,6 +113,8 @@ import org.slf4j.LoggerFactory;
 @SuppressWarnings("unchecked")
 public class MRApp extends MRAppMaster {
   private static final Logger LOG = LoggerFactory.getLogger(MRApp.class);
+  private static final int WAIT_FOR_STATE_CNT = 200;
+  private static final int WAIT_FOR_STATE_INTERVAL= 50;
 
   /**
    * The available resource of each container allocated.
@@ -322,13 +324,11 @@ public class MRApp extends MRAppMaster {
       JobStateInternal finalState) throws Exception {
     int timeoutSecs = 0;
     JobStateInternal iState = job.getInternalState();
-    while (!finalState.equals(iState) && timeoutSecs++ < 20) {
-      System.out.println("Job Internal State is : " + iState
-          + " Waiting for Internal state : " + finalState);
-      Thread.sleep(500);
+    while (!finalState.equals(iState) && timeoutSecs++ < WAIT_FOR_STATE_CNT) {
+      Thread.sleep(WAIT_FOR_STATE_INTERVAL);
       iState = job.getInternalState();
     }
-    System.out.println("Task Internal State is : " + iState);
+    LOG.info("Job {} Internal State is : {}", job.getID(), iState);
     Assert.assertEquals("Task Internal state is not correct (timedout)",
         finalState, iState);
   }
@@ -336,17 +336,12 @@ public class MRApp extends MRAppMaster {
   public void waitForInternalState(TaskImpl task,
       TaskStateInternal finalState) throws Exception {
     int timeoutSecs = 0;
-    TaskReport report = task.getReport();
     TaskStateInternal iState = task.getInternalState();
-    while (!finalState.equals(iState) && timeoutSecs++ < 20) {
-      System.out.println("Task Internal State is : " + iState
-          + " Waiting for Internal state : " + finalState + "   progress : "
-          + report.getProgress());
-      Thread.sleep(500);
-      report = task.getReport();
+    while (!finalState.equals(iState) && timeoutSecs++ < WAIT_FOR_STATE_CNT) {
+      Thread.sleep(WAIT_FOR_STATE_INTERVAL);
       iState = task.getInternalState();
     }
-    System.out.println("Task Internal State is : " + iState);
+    LOG.info("Task {} Internal State is : {}", task.getID(), iState);
     Assert.assertEquals("Task Internal state is not correct (timedout)",
         finalState, iState);
   }
@@ -354,17 +349,12 @@ public class MRApp extends MRAppMaster {
   public void waitForInternalState(TaskAttemptImpl attempt,
       TaskAttemptStateInternal finalState) throws Exception {
     int timeoutSecs = 0;
-    TaskAttemptReport report = attempt.getReport();
     TaskAttemptStateInternal iState = attempt.getInternalState();
-    while (!finalState.equals(iState) && timeoutSecs++ < 20) {
-      System.out.println("TaskAttempt Internal State is : " + iState
-          + " Waiting for Internal state : " + finalState + "   progress : "
-          + report.getProgress());
-      Thread.sleep(500);
-      report = attempt.getReport();
+    while (!finalState.equals(iState) && timeoutSecs++ < WAIT_FOR_STATE_CNT) {
+      Thread.sleep(WAIT_FOR_STATE_INTERVAL);
       iState = attempt.getInternalState();
     }
-    System.out.println("TaskAttempt Internal State is : " + iState);
+    LOG.info("TaskAttempt {} Internal State is : {}", attempt.getID(), iState);
     Assert.assertEquals("TaskAttempt Internal state is not correct (timedout)",
         finalState, iState);
   }
@@ -374,17 +364,12 @@ public class MRApp extends MRAppMaster {
     int timeoutSecs = 0;
     TaskAttemptReport report = attempt.getReport();
     while (!finalState.equals(report.getTaskAttemptState()) &&
-        timeoutSecs++ < 20) {
-      System.out.println(
-          "TaskAttempt " + attempt.getID().toString() + "  State is : "
-              + report.getTaskAttemptState()
-              + " Waiting for state : " + finalState
-              + "   progress : " + report.getProgress());
+        timeoutSecs++ < WAIT_FOR_STATE_CNT) {
+      Thread.sleep(WAIT_FOR_STATE_INTERVAL);
       report = attempt.getReport();
-      Thread.sleep(500);
     }
-    System.out.println("TaskAttempt State is : "
-        + report.getTaskAttemptState());
+    LOG.info("TaskAttempt {} State is : {}", attempt.getID(),
+        report.getTaskAttemptState());
     Assert.assertEquals("TaskAttempt state is not correct (timedout)",
         finalState,
         report.getTaskAttemptState());
@@ -418,14 +403,11 @@ public class MRApp extends MRAppMaster {
     int timeoutSecs = 0;
     TaskReport report = task.getReport();
     while (!finalState.equals(report.getTaskState()) &&
-        timeoutSecs++ < 20) {
-      System.out.println("Task State for " + task.getID() + " is : "
-          + report.getTaskState() + " Waiting for state : " + finalState
-          + "   progress : " + report.getProgress());
+        timeoutSecs++ < WAIT_FOR_STATE_CNT) {
+      Thread.sleep(WAIT_FOR_STATE_INTERVAL);
       report = task.getReport();
-      Thread.sleep(500);
     }
-    System.out.println("Task State is : " + report.getTaskState());
+    LOG.info("Task {} State is : {}", task.getID(), report.getTaskState());
     Assert.assertEquals("Task state is not correct (timedout)", finalState,
         report.getTaskState());
   }
@@ -434,15 +416,11 @@ public class MRApp extends MRAppMaster {
     int timeoutSecs = 0;
     JobReport report = job.getReport();
     while (!finalState.equals(report.getJobState()) &&
-        timeoutSecs++ < 20) {
-      System.out.println("Job State is : " + report.getJobState() +
-          " Waiting for state : " + finalState +
-          "   map progress : " + report.getMapProgress() + 
-          "   reduce progress : " + report.getReduceProgress());
+        timeoutSecs++ < WAIT_FOR_STATE_CNT) {
       report = job.getReport();
-      Thread.sleep(500);
+      Thread.sleep(WAIT_FOR_STATE_INTERVAL);
     }
-    System.out.println("Job State is : " + report.getJobState());
+    LOG.info("Job {} State is : {}", job.getID(), report.getJobState());
     Assert.assertEquals("Job state is not correct (timedout)", finalState, 
         job.getState());
   }
@@ -453,12 +431,11 @@ public class MRApp extends MRAppMaster {
            waitForServiceToStop(20 * 1000));
     } else {
       int timeoutSecs = 0;
-      while (!finalState.equals(getServiceState()) && timeoutSecs++ < 20) {
-        System.out.println("MRApp State is : " + getServiceState()
-            + " Waiting for state : " + finalState);
-        Thread.sleep(500);
+      while (!finalState.equals(getServiceState())
+          && timeoutSecs++ < WAIT_FOR_STATE_CNT) {
+        Thread.sleep(WAIT_FOR_STATE_INTERVAL);
       }
-      System.out.println("MRApp State is : " + getServiceState());
+      LOG.info("MRApp State is : {}", getServiceState());
       Assert.assertEquals("MRApp state is not correct (timedout)", finalState,
           getServiceState());
     }
@@ -467,16 +444,18 @@ public class MRApp extends MRAppMaster {
   public void verifyCompleted() {
     for (Job job : getContext().getAllJobs().values()) {
       JobReport jobReport = job.getReport();
-      System.out.println("Job start time :" + jobReport.getStartTime());
-      System.out.println("Job finish time :" + jobReport.getFinishTime());
+      LOG.info("Job start time :{}", jobReport.getStartTime());
+      LOG.info("Job finish time :", jobReport.getFinishTime());
       Assert.assertTrue("Job start time is not less than finish time",
           jobReport.getStartTime() <= jobReport.getFinishTime());
       Assert.assertTrue("Job finish time is in future",
           jobReport.getFinishTime() <= System.currentTimeMillis());
       for (Task task : job.getTasks().values()) {
         TaskReport taskReport = task.getReport();
-        System.out.println("Task start time : " + taskReport.getStartTime());
-        System.out.println("Task finish time : " + taskReport.getFinishTime());
+        LOG.info("Task {} start time : {}", task.getID(),
+            taskReport.getStartTime());
+        LOG.info("Task {} finish time : {}", task.getID(),
+            taskReport.getFinishTime());
         Assert.assertTrue("Task start time is not less than finish time",
             taskReport.getStartTime() <= taskReport.getFinishTime());
         for (TaskAttempt attempt : task.getAttempts().values()) {