Browse Source

[AMBARI-24841] Pause after first batch (dsen) (#2554)

Dmitry Sen 7 years ago
parent
commit
3287fe267e

+ 8 - 0
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestScheduleResourceProvider.java

@@ -81,6 +81,7 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
   public static final String TASK_FAILURE_TOLERANCE_PER_BATCH_PROPERTY_ID = "task_failure_tolerance_per_batch";
   public static final String TASK_FAILURE_TOLERANCE_LIMIT_PROPERTY_ID = "task_failure_tolerance_limit";
   public static final String REQUESTS_PROPERTY_ID = "requests";
+  public static final String PAUSE_AFTER_FIRST_BATCH_PROPERTY_ID = "pause_after_first_batch";
 
   public static final String TYPE_PROPERTY_ID = "type";
   public static final String URI_PROPERTY_ID = "uri";
@@ -126,6 +127,8 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
   public static final String TASK_FAILURE_TOLERANCE_PER_BATCH = PropertyHelper.getPropertyId(BATCH_SETTINGS, TASK_FAILURE_TOLERANCE_PER_BATCH_PROPERTY_ID);
   public static final String REQUESTS = PropertyHelper.getPropertyId(null, REQUESTS_PROPERTY_ID);
 
+  public static final String PAUSE_AFTER_FIRST_BATCH = PropertyHelper.getPropertyId(BATCH_SETTINGS, PAUSE_AFTER_FIRST_BATCH_PROPERTY_ID);
+
   public static final String TYPE = PropertyHelper.getPropertyId(null, TYPE_PROPERTY_ID);
   public static final String URI = PropertyHelper.getPropertyId(null, URI_PROPERTY_ID);
   public static final String ORDER_ID = PropertyHelper.getPropertyId(null, ORDER_ID_PROPERTY_ID);
@@ -167,6 +170,7 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
     BATCH_SEPARATION_IN_SECONDS,
     TASK_FAILURE_TOLERANCE,
     TASK_FAILURE_TOLERANCE_PER_BATCH,
+    PAUSE_AFTER_FIRST_BATCH,
     REQUESTS,
     TYPE,
     URI,
@@ -656,6 +660,10 @@ public class RequestScheduleResourceProvider extends AbstractControllerResourceP
                   (BATCH_SEPARATION_IN_SECONDS)) {
                 batchSettings.setBatchSeparationInSeconds(Integer.valueOf
                   ((String) batchMapEntry.getValue()));
+              } else if (batchMapEntry.getKey().equals
+                  (PAUSE_AFTER_FIRST_BATCH)) {
+                batchSettings.setPauseAfterFirstBatch(Boolean.valueOf
+                    ((String) batchMapEntry.getValue()));
               } else if (batchMapEntry.getKey().equals
                   (REQUESTS)) {
                 HashSet<Map<String, Object>> requestSet =

+ 11 - 0
ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestScheduleEntity.java

@@ -73,6 +73,9 @@ public class RequestScheduleEntity {
   @Column(name = "batch_toleration_limit_per_batch")
   private Integer batchTolerationLimitPerBatch;
 
+  @Column(name = "pause_after_first_batch")
+  private Boolean pauseAfterFirstBatch;
+
   @Column(name = "authenticated_user_id")
   private Integer authenticatedUserId;
 
@@ -338,4 +341,12 @@ public class RequestScheduleEntity {
   public void setBatchTolerationLimitPerBatch(Integer batchTolerationLimitPerBatch) {
     this.batchTolerationLimitPerBatch = batchTolerationLimitPerBatch;
   }
+
+  public Boolean isPauseAfterFirstBatch() {
+    return pauseAfterFirstBatch;
+  }
+
+  public void setPauseAfterFirstBatch(Boolean pauseAfterFirstBatch) {
+    this.pauseAfterFirstBatch = pauseAfterFirstBatch;
+  }
 }

+ 8 - 0
ambari-server/src/main/java/org/apache/ambari/server/scheduler/AbstractLinearExecutionJob.java

@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.scheduler;
 
+import static org.apache.ambari.server.state.scheduler.BatchRequestJob.BATCH_REQUEST_BATCH_ID_KEY;
 import static org.apache.ambari.server.state.scheduler.BatchRequestJob.BATCH_REQUEST_CLUSTER_NAME_KEY;
 import static org.apache.ambari.server.state.scheduler.BatchRequestJob.BATCH_REQUEST_EXECUTION_ID_KEY;
 import static org.apache.ambari.server.state.scheduler.RequestExecution.Status.ABORTED;
@@ -129,6 +130,13 @@ public abstract class AbstractLinearExecutionJob implements ExecutionJob {
       return;
     }
 
+    try {
+      executionScheduleManager.pauseAfterBatchIfNeeded(jobDataMap.getLong(BATCH_REQUEST_EXECUTION_ID_KEY),
+          jobDataMap.getLong(BATCH_REQUEST_BATCH_ID_KEY), jobDataMap.getString(BATCH_REQUEST_CLUSTER_NAME_KEY));
+    } catch (AmbariException e) {
+      LOG.warn("Received exception while trying to auto pause the scheduled request execution :", e);
+    }
+
     String status = null;
     try {
       status = executionScheduleManager.getBatchRequestStatus(jobDataMap.getLong(BATCH_REQUEST_EXECUTION_ID_KEY), jobDataMap.getString(BATCH_REQUEST_CLUSTER_NAME_KEY));

+ 30 - 3
ambari-server/src/main/java/org/apache/ambari/server/scheduler/ExecutionScheduleManager.java

@@ -450,7 +450,7 @@ public class ExecutionScheduleManager {
         Collections.sort(batchRequests);
         ListIterator<BatchRequest> iterator = batchRequests.listIterator(batchRequests.size());
         String nextJobName = null;
-        long nextBatchOrderId = batchRequests.size();
+        long nextBatchOrderId = Integer.MAX_VALUE/2;
         while (nextBatchOrderId != startingBatchOrderId && iterator.hasPrevious()) {
           BatchRequest batchRequest = iterator.previous();
 
@@ -538,13 +538,13 @@ public class ExecutionScheduleManager {
         ListIterator<BatchRequest> iterator = batchRequests.listIterator();
         do {
           result = iterator.next();
-        } while (iterator.hasNext() &&
+        } while (iterator.hasNext() && result.getStatus() != null &&
                  HostRoleStatus.getCompletedStates().contains(HostRoleStatus.valueOf(result.getStatus())) &&
                  !HostRoleStatus.ABORTED.name().equals(result.getStatus()));
       }
     }
 
-    if (result != null &&
+    if (result != null && result.getStatus() != null &&
       HostRoleStatus.getCompletedStates().contains(HostRoleStatus.valueOf(result.getStatus())) &&
       !HostRoleStatus.ABORTED.name().equals(result.getStatus())) {
       return null;
@@ -995,5 +995,32 @@ public class ExecutionScheduleManager {
     }
     return result.path(relativeUri);
   }
+
+  /**
+   * Checks if scheduled request should be auto paused and updates the status to PAUSED if it does.
+   * For now the condition is following: the current status is SCHEDULED,
+   * it's the first batch and the pauseAfterFirstBatch flag is TRUE
+   */
+  public void pauseAfterBatchIfNeeded(long executionId, long batchId, String clusterName) throws AmbariException {
+    Cluster cluster = clusters.getCluster(clusterName);
+    RequestExecution requestExecution = cluster.getAllRequestExecutions().get(executionId);
+
+    if (requestExecution == null) {
+      throw new AmbariException("Unable to find request schedule with id = "
+        + executionId);
+    }
+
+    Batch batch = requestExecution.getBatch();
+    if (batch != null) {
+      BatchSettings batchSettings = batch.getBatchSettings();
+      if (batchSettings != null) {
+        if (SCHEDULED.name().equals(requestExecution.getStatus()) && getFirstJobOrderId(requestExecution) == batchId &&
+            batchSettings.isPauseAfterFirstBatch()) {
+          LOG.info("Auto pausing the scheduled request after first batch. Scheduled request ID : " + executionId);
+          requestExecution.updateStatus(PAUSED);
+        }
+      }
+    }
+  }
 }
 

+ 11 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/BatchSettings.java

@@ -24,6 +24,7 @@ public class BatchSettings {
   private Integer batchSeparationInSeconds;
   private Integer taskFailureTolerance;
   private Integer taskFailureTolerancePerBatch;
+  private Boolean pauseAfterFirstBatch = false;
 
   @JsonSerialize(include = JsonSerialize.Inclusion.NON_EMPTY)
   @JsonProperty("batch_separation_in_seconds")
@@ -51,6 +52,16 @@ public class BatchSettings {
     return taskFailureTolerancePerBatch;
   }
 
+  @JsonSerialize(include = JsonSerialize.Inclusion.NON_EMPTY)
+  @JsonProperty("pause_after_first_batch")
+  public Boolean isPauseAfterFirstBatch() {
+    return pauseAfterFirstBatch;
+  }
+
+  public void setPauseAfterFirstBatch(Boolean pauseAfterFirstBatch) {
+    this.pauseAfterFirstBatch = pauseAfterFirstBatch;
+  }
+
   public void setTaskFailureToleranceLimitPerBatch(Integer taskFailureTolerancePerBatch) {
     this.taskFailureTolerancePerBatch = taskFailureTolerancePerBatch;
   }

+ 2 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/scheduler/RequestExecutionImpl.java

@@ -108,6 +108,7 @@ public class RequestExecutionImpl implements RequestExecution {
     batchSettings.setBatchSeparationInSeconds(requestScheduleEntity.getBatchSeparationInSeconds());
     batchSettings.setTaskFailureToleranceLimit(requestScheduleEntity.getBatchTolerationLimit());
     batchSettings.setTaskFailureToleranceLimitPerBatch(requestScheduleEntity.getBatchTolerationLimitPerBatch());
+    batchSettings.setPauseAfterFirstBatch(requestScheduleEntity.isPauseAfterFirstBatch());
 
     batch.setBatchSettings(batchSettings);
 
@@ -331,6 +332,7 @@ public class RequestExecutionImpl implements RequestExecution {
         requestScheduleEntity.setBatchSeparationInSeconds(settings.getBatchSeparationInSeconds());
         requestScheduleEntity.setBatchTolerationLimit(settings.getTaskFailureToleranceLimit());
         requestScheduleEntity.setBatchTolerationLimitPerBatch(settings.getTaskFailureToleranceLimitPerBatch());
+        requestScheduleEntity.setPauseAfterFirstBatch(settings.isPauseAfterFirstBatch());
       }
     }
   }

+ 4 - 0
ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog280.java

@@ -38,6 +38,7 @@ public class UpgradeCatalog280 extends AbstractUpgradeCatalog {
   private static final String REQUEST_SCHEDULE_TABLE_NAME = "requestschedule";
   protected static final String HOST_COMPONENT_STATE_TABLE = "hostcomponentstate";
   private static final String REQUEST_SCHEDULE_BATCH_TOLERATION_LIMIT_PER_BATCH_COLUMN_NAME = "batch_toleration_limit_per_batch";
+  private static final String REQUEST_SCHEDULE_PAUSE_AFTER_FIRST_BATCH_COLUMN_NAME = "pause_after_first_batch";
   protected static final String LAST_LIVE_STATE_COLUMN = "last_live_state";
 
   private static final String UPGRADE_TABLE = "upgrade";
@@ -83,6 +84,9 @@ public class UpgradeCatalog280 extends AbstractUpgradeCatalog {
     dbAccessor.addColumn(REQUEST_SCHEDULE_TABLE_NAME,
         new DBAccessor.DBColumnInfo(REQUEST_SCHEDULE_BATCH_TOLERATION_LIMIT_PER_BATCH_COLUMN_NAME, Short.class, null,
             null, true));
+    dbAccessor.addColumn(REQUEST_SCHEDULE_TABLE_NAME,
+        new DBAccessor.DBColumnInfo(REQUEST_SCHEDULE_PAUSE_AFTER_FIRST_BATCH_COLUMN_NAME, Boolean.class, null,
+            null, true));
 
   }
 

+ 1 - 0
ambari-server/src/main/resources/Ambari-DDL-Derby-CREATE.sql

@@ -364,6 +364,7 @@ CREATE TABLE requestschedule (
   batch_separation_seconds smallint,
   batch_toleration_limit smallint,
   batch_toleration_limit_per_batch smallint,
+  pause_after_first_batch BOOLEAN,
   authenticated_user_id INTEGER,
   create_user varchar(255),
   create_timestamp bigint,

+ 1 - 0
ambari-server/src/main/resources/Ambari-DDL-MySQL-CREATE.sql

@@ -384,6 +384,7 @@ CREATE TABLE requestschedule (
   batch_separation_seconds smallint,
   batch_toleration_limit smallint,
   batch_toleration_limit_per_batch smallint,
+  pause_after_first_batch VARCHAR(1),
   authenticated_user_id INTEGER,
   create_user varchar(255),
   create_timestamp bigint,

+ 1 - 0
ambari-server/src/main/resources/Ambari-DDL-Oracle-CREATE.sql

@@ -365,6 +365,7 @@ CREATE TABLE requestschedule (
   batch_separation_seconds smallint,
   batch_toleration_limit smallint,
   batch_toleration_limit_per_batch smallint,
+  pause_after_first_batch VARCHAR2(1),
   authenticated_user_id NUMBER(10),
   create_user VARCHAR2(255),
   create_timestamp NUMBER(19),

+ 1 - 0
ambari-server/src/main/resources/Ambari-DDL-Postgres-CREATE.sql

@@ -366,6 +366,7 @@ CREATE TABLE requestschedule (
   batch_separation_seconds smallint,
   batch_toleration_limit smallint,
   batch_toleration_limit_per_batch smallint,
+  pause_after_first_batch BOOL,
   authenticated_user_id INTEGER,
   create_user varchar(255),
   create_timestamp bigint,

+ 1 - 0
ambari-server/src/main/resources/Ambari-DDL-SQLAnywhere-CREATE.sql

@@ -363,6 +363,7 @@ CREATE TABLE requestschedule (
   batch_separation_seconds smallint,
   batch_toleration_limit smallint,
   batch_toleration_limit_per_batch smallint,
+  pause_after_first_batch VARCHAR(1),
   authenticated_user_id INTEGER,
   create_user VARCHAR(255),
   create_timestamp NUMERIC(19),

+ 1 - 0
ambari-server/src/main/resources/Ambari-DDL-SQLServer-CREATE.sql

@@ -369,6 +369,7 @@ CREATE TABLE requestschedule (
   batch_separation_seconds SMALLINT,
   batch_toleration_limit SMALLINT,
   batch_toleration_limit_per_batch SMALLINT,
+  pause_after_first_batch BIT,
   authenticated_user_id INTEGER,
   create_user VARCHAR(255),
   create_timestamp BIGINT,

+ 1 - 1
ambari-server/src/test/java/org/apache/ambari/server/scheduler/ExecutionScheduleManagerTest.java

@@ -847,7 +847,7 @@ public class ExecutionScheduleManagerTest {
     expect(batchRequestMock1.compareTo(batchRequestMock2)).andReturn(-1).anyTimes();
     expect(batchRequestMock2.compareTo(batchRequestMock1)).andReturn(1).anyTimes();
     expect(batchRequestMock2.getOrderId()).andReturn(3L).anyTimes();
-    expect(batchRequestMock2.getStatus()).andReturn(HostRoleStatus.PENDING.name()).once();
+    expect(batchRequestMock2.getStatus()).andReturn(HostRoleStatus.PENDING.name()).anyTimes();
     expect(executionSchedulerMock.getJobDetail((JobKey) anyObject()))
       .andReturn(jobDetailMock).anyTimes();
     expect((List<Trigger>) executionSchedulerMock

+ 15 - 4
ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog280Test.java

@@ -56,6 +56,10 @@ public class UpgradeCatalog280Test {
     dbAccessor.addColumn(eq("requestschedule"), capture(perBatchLimitColumn));
     expectLastCall().once();
 
+    Capture<DBAccessor.DBColumnInfo> autoPauseColumn = newCapture(CaptureType.ALL);
+    dbAccessor.addColumn(eq("requestschedule"), capture(autoPauseColumn));
+    expectLastCall().once();
+
     dbAccessor.dropColumn(eq(HOST_COMPONENT_STATE_TABLE), eq(LAST_LIVE_STATE_COLUMN));
     expectLastCall().once();
 
@@ -70,12 +74,19 @@ public class UpgradeCatalog280Test {
     upgradeCatalog280.dbAccessor = dbAccessor;
     upgradeCatalog280.executeDDLUpdates();
 
-    DBAccessor.DBColumnInfo capturedBlueprintProvisioningStateColumn =
+    DBAccessor.DBColumnInfo perBatchLimitColumnInfo =
         perBatchLimitColumn.getValue();
     Assert.assertEquals("batch_toleration_limit_per_batch",
-        capturedBlueprintProvisioningStateColumn.getName());
-    Assert.assertEquals(null, capturedBlueprintProvisioningStateColumn.getDefaultValue());
-    Assert.assertEquals(Short.class, capturedBlueprintProvisioningStateColumn.getType());
+      perBatchLimitColumnInfo.getName());
+    Assert.assertEquals(null, perBatchLimitColumnInfo.getDefaultValue());
+    Assert.assertEquals(Short.class, perBatchLimitColumnInfo.getType());
+
+    DBAccessor.DBColumnInfo autoPauseColumnInfo =
+      autoPauseColumn.getValue();
+    Assert.assertEquals("pause_after_first_batch",
+      autoPauseColumnInfo.getName());
+    Assert.assertEquals(null, autoPauseColumnInfo.getDefaultValue());
+    Assert.assertEquals(Boolean.class, autoPauseColumnInfo.getType());
 
     DBAccessor.DBColumnInfo capturedUpgradeColumn = upgradePackStackColumn.getValue();
     Assert.assertEquals("upgrade_pack_stack_id", capturedUpgradeColumn.getName());