|
@@ -18,10 +18,10 @@
|
|
|
|
|
|
package org.apache.ambari.server.serveraction;
|
|
package org.apache.ambari.server.serveraction;
|
|
|
|
|
|
-import java.util.EnumSet;
|
|
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
+import java.util.Set;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
|
|
|
@@ -32,9 +32,10 @@ import org.apache.ambari.server.actionmanager.ActionDBAccessor;
|
|
import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper;
|
|
import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper;
|
|
import org.apache.ambari.server.actionmanager.HostRoleCommand;
|
|
import org.apache.ambari.server.actionmanager.HostRoleCommand;
|
|
import org.apache.ambari.server.actionmanager.HostRoleStatus;
|
|
import org.apache.ambari.server.actionmanager.HostRoleStatus;
|
|
-import org.apache.ambari.server.actionmanager.RequestStatus;
|
|
|
|
|
|
+import org.apache.ambari.server.actionmanager.Request;
|
|
import org.apache.ambari.server.agent.CommandReport;
|
|
import org.apache.ambari.server.agent.CommandReport;
|
|
import org.apache.ambari.server.agent.ExecutionCommand;
|
|
import org.apache.ambari.server.agent.ExecutionCommand;
|
|
|
|
+import org.apache.ambari.server.controller.internal.CalculatedStatus;
|
|
import org.apache.ambari.server.utils.StageUtils;
|
|
import org.apache.ambari.server.utils.StageUtils;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -123,7 +124,7 @@ public class ServerActionExecutor {
|
|
* @param sleepTimeMS the time (in milliseconds) to wait between polling the database for more tasks
|
|
* @param sleepTimeMS the time (in milliseconds) to wait between polling the database for more tasks
|
|
*/
|
|
*/
|
|
public ServerActionExecutor(ActionDBAccessor db, long sleepTimeMS) {
|
|
public ServerActionExecutor(ActionDBAccessor db, long sleepTimeMS) {
|
|
- this.serverHostName = StageUtils.getHostName();
|
|
|
|
|
|
+ serverHostName = StageUtils.getHostName();
|
|
this.db = db;
|
|
this.db = db;
|
|
this.sleepTimeMS = (sleepTimeMS < 1) ? POLLING_TIMEOUT_MS : sleepTimeMS;
|
|
this.sleepTimeMS = (sleepTimeMS < 1) ? POLLING_TIMEOUT_MS : sleepTimeMS;
|
|
}
|
|
}
|
|
@@ -231,20 +232,44 @@ public class ServerActionExecutor {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Cleans up orphaned shared data Maps due to completed or failed request contexts.
|
|
|
|
|
|
+ * Cleans up orphaned shared data Maps due to completed or failed request
|
|
|
|
+ * contexts. We are unable to use {@link Request#getStatus()} since this field
|
|
|
|
+ * is not populated in the database but, instead, calculated in realtime.
|
|
*/
|
|
*/
|
|
private void cleanRequestShareDataContexts() {
|
|
private void cleanRequestShareDataContexts() {
|
|
- // Clean out any orphaned request shared data contexts
|
|
|
|
- for (RequestStatus status : EnumSet.of(RequestStatus.FAILED, RequestStatus.COMPLETED)) {
|
|
|
|
- List<Long> requests = db.getRequestsByStatus(status, 100, true);
|
|
|
|
-
|
|
|
|
- if (requests != null) {
|
|
|
|
- synchronized (requestSharedDataMap) {
|
|
|
|
- for (Long requestId : requests) {
|
|
|
|
- requestSharedDataMap.remove(requestId);
|
|
|
|
|
|
+ // if the cache is empty, do nothing
|
|
|
|
+ if (requestSharedDataMap.isEmpty()) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ // for every item in the map, get the request and check its status
|
|
|
|
+ synchronized (requestSharedDataMap) {
|
|
|
|
+ Set<Long> requestIds = requestSharedDataMap.keySet();
|
|
|
|
+ List<Request> requests = db.getRequests(requestIds);
|
|
|
|
+ for (Request request : requests) {
|
|
|
|
+ // calcuate the status from the stages and then remove from the map if
|
|
|
|
+ // necessary
|
|
|
|
+ CalculatedStatus calculatedStatus = CalculatedStatus.statusFromStages(
|
|
|
|
+ request.getStages());
|
|
|
|
+
|
|
|
|
+ // calcuate the status of the request
|
|
|
|
+ HostRoleStatus status = calculatedStatus.getStatus();
|
|
|
|
+
|
|
|
|
+ // remove the request from the map if the request is COMPLETED or
|
|
|
|
+ // FAILED
|
|
|
|
+ switch (status) {
|
|
|
|
+ case FAILED:
|
|
|
|
+ case COMPLETED:
|
|
|
|
+ requestSharedDataMap.remove(request.getRequestId());
|
|
|
|
+ break;
|
|
|
|
+ default:
|
|
|
|
+ break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ } catch (Exception exception) {
|
|
|
|
+ LOG.warn("Unable to clear the server-side action request cache", exception);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -551,7 +576,7 @@ public class ServerActionExecutor {
|
|
* @param executionCommand the ExecutionCommand for the relevant task
|
|
* @param executionCommand the ExecutionCommand for the relevant task
|
|
*/
|
|
*/
|
|
private Worker(HostRoleCommand hostRoleCommand, ExecutionCommand executionCommand) {
|
|
private Worker(HostRoleCommand hostRoleCommand, ExecutionCommand executionCommand) {
|
|
- this.taskId = hostRoleCommand.getTaskId();
|
|
|
|
|
|
+ taskId = hostRoleCommand.getTaskId();
|
|
this.hostRoleCommand = hostRoleCommand;
|
|
this.hostRoleCommand = hostRoleCommand;
|
|
this.executionCommand = executionCommand;
|
|
this.executionCommand = executionCommand;
|
|
}
|
|
}
|