|
@@ -26,6 +26,7 @@ import org.apache.ambari.server.scheduler.ExecutionScheduleManager;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
|
|
|
public class BatchRequestJob extends AbstractLinearExecutionJob {
|
|
@@ -36,7 +37,11 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
|
|
|
public static final String BATCH_REQUEST_BATCH_ID_KEY =
|
|
|
"BatchRequestJob.BatchId";
|
|
|
public static final String BATCH_REQUEST_CLUSTER_NAME_KEY =
|
|
|
- "BatchRequestJob.ClusterName";
|
|
|
+ "BatchRequestJob.ClusterName";
|
|
|
+ public static final String BATCH_REQUEST_FAILED_TASKS_KEY =
|
|
|
+ "BatchRequestJob.FailedTaskCount";
|
|
|
+ public static final String BATCH_REQUEST_TOTAL_TASKS_KEY =
|
|
|
+ "BatchRequestJob.TotalTaskCount";
|
|
|
|
|
|
private final long statusCheckInterval;
|
|
|
|
|
@@ -63,18 +68,23 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
|
|
|
+ ", batch_id = " + batchId);
|
|
|
}
|
|
|
|
|
|
+ // Aggregate tasks counts stored in the DataMap
|
|
|
+ Map<String, Integer> taskCounts = getTaskCountProperties(properties);
|
|
|
+
|
|
|
Long requestId = executionScheduleManager.executeBatchRequest
|
|
|
(executionId, batchId, clusterName);
|
|
|
|
|
|
if (requestId != null) {
|
|
|
HostRoleStatus status;
|
|
|
+ BatchRequestResponse batchRequestResponse;
|
|
|
do {
|
|
|
- BatchRequestResponse batchRequestResponse =
|
|
|
- executionScheduleManager.getBatchRequestResponse(requestId, clusterName);
|
|
|
+ batchRequestResponse = executionScheduleManager
|
|
|
+ .getBatchRequestResponse(requestId, clusterName);
|
|
|
|
|
|
status = HostRoleStatus.valueOf(batchRequestResponse.getStatus());
|
|
|
|
|
|
- executionScheduleManager.updateBatchRequest(executionId, batchId, clusterName, batchRequestResponse, true);
|
|
|
+ executionScheduleManager.updateBatchRequest(executionId, batchId,
|
|
|
+ clusterName, batchRequestResponse, true);
|
|
|
|
|
|
try {
|
|
|
Thread.sleep(statusCheckInterval);
|
|
@@ -84,6 +94,57 @@ public class BatchRequestJob extends AbstractLinearExecutionJob {
|
|
|
throw new AmbariException(message, e);
|
|
|
}
|
|
|
} while (!status.isCompletedState());
|
|
|
+
|
|
|
+ // Store aggregated task status counts in the DataMap
|
|
|
+ Map<String, Integer> aggregateCounts = addTaskCountToProperties
|
|
|
+ (properties, taskCounts, batchRequestResponse);
|
|
|
+
|
|
|
+ if (executionScheduleManager.hasToleranceThresholdExceeded
|
|
|
+ (executionId, clusterName, aggregateCounts)) {
|
|
|
+
|
|
|
+ throw new AmbariException("Task failure tolerance limit exceeded"
|
|
|
+ + ", execution_id = " + executionId
|
|
|
+ + ", processed batch_id = " + batchId
|
|
|
+ + ", failed tasks = " + aggregateCounts.get(BATCH_REQUEST_FAILED_TASKS_KEY)
|
|
|
+ + ", total tasks = " + aggregateCounts.get(BATCH_REQUEST_TOTAL_TASKS_KEY));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<String, Integer> addTaskCountToProperties(Map<String, Object> properties,
|
|
|
+ Map<String, Integer> oldCounts,
|
|
|
+ BatchRequestResponse batchRequestResponse) {
|
|
|
+
|
|
|
+ Map<String, Integer> taskCounts = new HashMap<String, Integer>();
|
|
|
+
|
|
|
+ if (batchRequestResponse != null) {
|
|
|
+ Integer failedTasks = batchRequestResponse.getFailedTaskCount() +
|
|
|
+ batchRequestResponse.getAbortedTaskCount() +
|
|
|
+ batchRequestResponse.getTimedOutTaskCount();
|
|
|
+
|
|
|
+ Integer failedCount = oldCounts.get(BATCH_REQUEST_FAILED_TASKS_KEY) + failedTasks;
|
|
|
+ Integer totalCount = oldCounts.get(BATCH_REQUEST_TOTAL_TASKS_KEY) +
|
|
|
+ batchRequestResponse.getTotalTaskCount();
|
|
|
+
|
|
|
+ properties.put(BATCH_REQUEST_FAILED_TASKS_KEY, failedCount);
|
|
|
+ taskCounts.put(BATCH_REQUEST_FAILED_TASKS_KEY, failedCount);
|
|
|
+ properties.put(BATCH_REQUEST_TOTAL_TASKS_KEY, totalCount);
|
|
|
+ taskCounts.put(BATCH_REQUEST_TOTAL_TASKS_KEY, totalCount);
|
|
|
+ }
|
|
|
+
|
|
|
+ return taskCounts;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<String, Integer> getTaskCountProperties(Map<String, Object> properties) {
|
|
|
+ Map<String, Integer> taskCounts = new HashMap<String, Integer>();
|
|
|
+ if (properties != null) {
|
|
|
+ Object countObj = properties.get(BATCH_REQUEST_FAILED_TASKS_KEY);
|
|
|
+ taskCounts.put(BATCH_REQUEST_FAILED_TASKS_KEY,
|
|
|
+ countObj != null ? Integer.parseInt(countObj.toString()) : 0);
|
|
|
+ countObj = properties.get(BATCH_REQUEST_TOTAL_TASKS_KEY);
|
|
|
+ taskCounts.put(BATCH_REQUEST_TOTAL_TASKS_KEY, countObj != null ?
|
|
|
+ Integer.parseInt(countObj.toString()) : 0);
|
|
|
}
|
|
|
+ return taskCounts;
|
|
|
}
|
|
|
}
|