Bläddra i källkod

AMBARI-4758. Failure tolerance parameter doesn't seem to work for Rolling Restarts. (swagle)

Siddharth Wagle 11 år sedan
förälder
incheckning
b7856cbcf7

+ 8 - 2
ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java

@@ -18,6 +18,7 @@
 package org.apache.ambari.server.scheduler;
 
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.state.scheduler.BatchRequestJob;
 import org.quartz.DateBuilder;
 import org.quartz.DisallowConcurrentExecution;
 import org.quartz.JobDataMap;
@@ -42,8 +43,6 @@ import static org.quartz.TriggerBuilder.newTrigger;
  * template method "doWork()" (where the extending Job class's real work goes)
  * and then it schedules the follow-up job.
  */
-@PersistJobDataAfterExecution
-@DisallowConcurrentExecution
 public abstract class AbstractLinearExecutionJob implements ExecutionJob {
   private static Logger LOG = LoggerFactory.getLogger(AbstractLinearExecutionJob.class);
   protected ExecutionScheduleManager executionScheduleManager;
@@ -129,13 +128,20 @@ public abstract class AbstractLinearExecutionJob implements ExecutionJob {
     }
 
     int separationSeconds = jobDataMap.getIntValue(NEXT_EXECUTION_SEPARATION_SECONDS);
+    Object failedCount = properties.get(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY);
+    Object totalCount = properties.get(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY);
 
     // Create trigger for next job execution
+    // Persist counts with trigger, so that they apply to current batch only
     Trigger trigger = newTrigger()
       .forJob(nextJobName, nextJobGroup)
       .withIdentity("TriggerForJob-" + nextJobName, LINEAR_EXECUTION_TRIGGER_GROUP)
       .withSchedule(simpleSchedule().withMisfireHandlingInstructionFireNow())
       .startAt(futureDate(separationSeconds, DateBuilder.IntervalUnit.SECOND))
+      .usingJobData(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY,
+        failedCount != null ? (Integer) failedCount : 0)
+      .usingJobData(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY,
+        totalCount != null ? (Integer) totalCount : 0)
       .build();
 
     executionScheduleManager.scheduleJob(trigger);

+ 3 - 2
ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java

@@ -43,6 +43,7 @@ import org.apache.ambari.server.state.scheduler.Schedule;
 import org.apache.ambari.server.utils.DateUtils;
 import org.apache.commons.lang.text.StrBuilder;
 import org.quartz.CronExpression;
+import org.quartz.JobDataMap;
 import org.quartz.JobDetail;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobKey;
@@ -300,8 +301,7 @@ public class ExecutionScheduleManager {
       List<BatchRequest> batchRequests = batch.getBatchRequests();
       if (batchRequests != null) {
         Collections.sort(batchRequests);
-        ListIterator<BatchRequest> iterator = batchRequests.listIterator
-          (batchRequests.size());
+        ListIterator<BatchRequest> iterator = batchRequests.listIterator(batchRequests.size());
         String nextJobName = null;
         while (iterator.hasPrevious()) {
           BatchRequest batchRequest = iterator.previous();
@@ -726,3 +726,4 @@ public class ExecutionScheduleManager {
     }
   }
 }
+

+ 9 - 5
ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchRequestJob.java

@@ -23,12 +23,15 @@ import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.scheduler.AbstractLinearExecutionJob;
 import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.PersistJobDataAfterExecution;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.HashMap;
 import java.util.Map;
 
+@PersistJobDataAfterExecution
+@DisallowConcurrentExecution
 public class BatchRequestJob extends AbstractLinearExecutionJob {
   private static final Logger LOG = LoggerFactory.getLogger(BatchRequestJob.class);
 
@@ -106,7 +109,7 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
             + ", execution_id = " + executionId
             + ", processed batch_id = " + batchId
             + ", failed tasks = " + aggregateCounts.get(BATCH_REQUEST_FAILED_TASKS_KEY)
-            + ", total tasks = " + aggregateCounts.get(BATCH_REQUEST_TOTAL_TASKS_KEY));
+            + ", total tasks completed = " + aggregateCounts.get(BATCH_REQUEST_TOTAL_TASKS_KEY));
       }
     }
   }
@@ -134,7 +137,7 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
 
   private Map<String, Integer> addTaskCountToProperties(Map<String, Object> properties,
                                         Map<String, Integer> oldCounts,
-                                        BatchRequestResponse batchRequestResponse) {
+                                        BatchRequestResponse batchRequestResponse) throws AmbariException {
 
     Map<String, Integer> taskCounts = new HashMap<String, Integer>();
 
@@ -147,10 +150,11 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
       Integer totalCount = oldCounts.get(BATCH_REQUEST_TOTAL_TASKS_KEY) +
         batchRequestResponse.getTotalTaskCount();
 
-      properties.put(BATCH_REQUEST_FAILED_TASKS_KEY, failedCount);
       taskCounts.put(BATCH_REQUEST_FAILED_TASKS_KEY, failedCount);
-      properties.put(BATCH_REQUEST_TOTAL_TASKS_KEY, totalCount);
       taskCounts.put(BATCH_REQUEST_TOTAL_TASKS_KEY, totalCount);
+
+      properties.put(BATCH_REQUEST_FAILED_TASKS_KEY, failedCount);
+      properties.put(BATCH_REQUEST_TOTAL_TASKS_KEY, totalCount);
     }
 
     return taskCounts;

+ 57 - 6
ambari-server/src/test/java/org/apache/ambari/server/state/scheduler/BatchRequestJobTest.java

@@ -18,20 +18,29 @@
 
 package org.apache.ambari.server.state.scheduler;
 
-import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
-import org.apache.ambari.server.scheduler.AbstractLinearExecutionJob;
 import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
 import org.easymock.Capture;
 import org.junit.Assert;
 import org.junit.Test;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
 import org.quartz.JobExecutionContext;
-
-import java.lang.reflect.Method;
+import org.quartz.JobKey;
+import org.quartz.Trigger;
 import java.util.HashMap;
 import java.util.Map;
-
-import static org.easymock.EasyMock.*;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.captureLong;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
 
 public class BatchRequestJobTest {
 
@@ -93,4 +102,46 @@ public class BatchRequestJobTest {
     Assert.assertEquals(batchId, batchIdCapture.getValue().longValue());
     Assert.assertEquals(clusterName, clusterNameCapture.getValue());
   }
+
+  @Test
+  public void testTaskCountsPersistedWithTrigger() throws Exception {
+    ExecutionScheduleManager scheduleManagerMock = createNiceMock
+      (ExecutionScheduleManager.class);
+    BatchRequestJob batchRequestJobMock = createMockBuilder
+      (BatchRequestJob.class).withConstructor(scheduleManagerMock, 100L)
+      .addMockedMethods("doWork")
+      .createMock();
+    JobExecutionContext executionContext = createNiceMock(JobExecutionContext.class);
+    JobDataMap jobDataMap = createNiceMock(JobDataMap.class);
+    JobDetail jobDetail = createNiceMock(JobDetail.class);
+    Map<String, Object> properties = new HashMap<String, Object>();
+    properties.put(BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY, 10);
+    properties.put(BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY, 20);
+
+    expect(scheduleManagerMock.continueOnMisfire(executionContext)).andReturn(true);
+    expect(executionContext.getMergedJobDataMap()).andReturn(jobDataMap);
+    expect(executionContext.getJobDetail()).andReturn(jobDetail);
+    expect(jobDetail.getKey()).andReturn(JobKey.jobKey("testJob", "testGroup"));
+    expect(jobDataMap.getWrappedMap()).andReturn(properties);
+    expect(jobDataMap.getString((String) anyObject())).andReturn("testJob").anyTimes();
+
+    Capture<Trigger> triggerCapture = new Capture<Trigger>();
+    scheduleManagerMock.scheduleJob(capture(triggerCapture));
+    expectLastCall().once();
+
+    replay(scheduleManagerMock, executionContext, jobDataMap, jobDetail);
+
+    batchRequestJobMock.execute(executionContext);
+
+    verify(scheduleManagerMock, executionContext, jobDataMap, jobDetail);
+
+    Trigger trigger = triggerCapture.getValue();
+    Assert.assertNotNull(trigger);
+    JobDataMap savedMap = trigger.getJobDataMap();
+    Assert.assertNotNull(savedMap);
+    Assert.assertEquals(10, savedMap.getIntValue
+      (BatchRequestJob.BATCH_REQUEST_FAILED_TASKS_KEY));
+    Assert.assertEquals(20, savedMap.getIntValue
+      (BatchRequestJob.BATCH_REQUEST_TOTAL_TASKS_KEY));
+  }
 }