瀏覽代碼

AMBARI-7985. Add server side command functionality.
Allow tasks to be executed on the Ambari Server host.

Robert Levas 10 年之前
父節點
當前提交
3d397dc04a
共有 23 個文件被更改,包括 1581 次插入366 次删除
  1. 5 0
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
  2. 6 1
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
  3. 2 3
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
  4. 28 69
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
  5. 32 7
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
  6. 0 3
      ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java
  7. 11 0
      ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java
  8. 138 0
      ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java
  9. 58 14
      ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerAction.java
  10. 525 0
      ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionExecutor.java
  11. 0 32
      ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManager.java
  12. 0 74
      ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManagerImpl.java
  13. 2 0
      ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java
  14. 5 1
      ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEventType.java
  15. 76 0
      ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostServerActionEvent.java
  16. 1 2
      ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostUpgradeEvent.java
  17. 75 5
      ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java
  18. 3 3
      ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java
  19. 271 124
      ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
  20. 2 4
      ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
  21. 1 24
      ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
  22. 92 0
      ambari-server/src/test/java/org/apache/ambari/server/serveraction/MockServerAction.java
  23. 248 0
      ambari-server/src/test/java/org/apache/ambari/server/serveraction/ServerActionExecutorTest.java

+ 5 - 0
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java

@@ -148,6 +148,11 @@ public interface ActionDBAccessor {
    */
   public Collection<HostRoleCommand> getTasks(Collection<Long> taskIds);
 
+  /**
+   * Get a List of host role commands where the role and status are as specified
+   */
+  public List<HostRoleCommand> getTasksByHostRoleAndStatus(String hostname, String role, HostRoleStatus status);
+
   /**
    * Get all stages that contain tasks with specified host role statuses
    */

+ 6 - 1
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java

@@ -212,7 +212,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     if (clusterEntity != null) {
       clusterId = clusterEntity.getClusterId();
     }
-    
+
     requestEntity.setClusterId(clusterId);
     requestDAO.create(requestEntity);
 
@@ -549,6 +549,11 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     return commands;
   }
 
+  @Override
+  public List<HostRoleCommand> getTasksByHostRoleAndStatus(String hostname, String role, HostRoleStatus status) {
+    return getTasks(hostRoleCommandDAO.findTaskIdsByHostRoleAndStatus(hostname, role, status));
+  }
+
   @Override
   public List<Stage> getStagesByHostRoleStatus(Set<HostRoleStatus> statuses) {
     List<Stage> stages = new ArrayList<Stage>();

+ 2 - 3
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java

@@ -28,7 +28,6 @@ import org.apache.ambari.server.api.services.BaseRequest;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.ExecuteActionRequest;
 import org.apache.ambari.server.controller.HostsMap;
-import org.apache.ambari.server.serveraction.ServerActionManager;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.utils.StageUtils;
 import org.slf4j.Logger;
@@ -60,12 +59,12 @@ public class ActionManager {
   public ActionManager(@Named("schedulerSleeptime") long schedulerSleepTime,
                        @Named("actionTimeout") long actionTimeout,
                        ActionQueue aq, Clusters fsm, ActionDBAccessor db, HostsMap hostsMap,
-                       ServerActionManager serverActionManager, UnitOfWork unitOfWork,
+                       UnitOfWork unitOfWork,
                        RequestFactory requestFactory, Configuration configuration) {
     this.actionQueue = aq;
     this.db = db;
     scheduler = new ActionScheduler(schedulerSleepTime, actionTimeout, db,
-        actionQueue, fsm, 2, hostsMap, serverActionManager, unitOfWork, configuration);
+        actionQueue, fsm, 2, hostsMap, unitOfWork, configuration);
     requestCounter = new AtomicLong(
         db.getLastPersistedRequestIdWhenInitialized());
     this.requestFactory = requestFactory;

+ 28 - 69
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java

@@ -43,8 +43,7 @@ import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.HostsMap;
-import org.apache.ambari.server.serveraction.ServerAction;
-import org.apache.ambari.server.serveraction.ServerActionManager;
+import org.apache.ambari.server.serveraction.ServerActionExecutor;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Host;
@@ -93,7 +92,7 @@ class ActionScheduler implements Runnable {
   private boolean taskTimeoutAdjustment = true;
   private final HostsMap hostsMap;
   private final Object wakeupSyncObject = new Object();
-  private final ServerActionManager serverActionManager;
+  private final ServerActionExecutor serverActionExecutor;
   private final Configuration configuration;
 
   private final Set<Long> requestsInProgress = new HashSet<Long>();
@@ -126,7 +125,7 @@ class ActionScheduler implements Runnable {
 
   public ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec,
       ActionDBAccessor db, ActionQueue actionQueue, Clusters fsmObject,
-      int maxAttempts, HostsMap hostsMap, ServerActionManager serverActionManager,
+      int maxAttempts, HostsMap hostsMap,
       UnitOfWork unitOfWork, Configuration configuration) {
     this.sleepTime = sleepTimeMilliSec;
     this.hostsMap = hostsMap;
@@ -135,7 +134,7 @@ class ActionScheduler implements Runnable {
     this.actionQueue = actionQueue;
     this.fsmObject = fsmObject;
     this.maxAttempts = (short) maxAttempts;
-    this.serverActionManager = serverActionManager;
+    this.serverActionExecutor = new ServerActionExecutor(db, sleepTimeMilliSec);
     this.unitOfWork = unitOfWork;
     this.clusterHostInfoCache = CacheBuilder.newBuilder().
         expireAfterAccess(5, TimeUnit.MINUTES).
@@ -152,11 +151,19 @@ class ActionScheduler implements Runnable {
   public void start() {
     schedulerThread = new Thread(this);
     schedulerThread.start();
+
+    // Start up the ServerActionExecutor. Since it is directly related to the ActionScheduler it
+    // should be started and stopped along with it.
+    serverActionExecutor.start();
   }
 
   public void stop() {
     shouldRun = false;
     schedulerThread.interrupt();
+
+    // Stop the ServerActionExecutor. Since it is directly related to the ActionScheduler it should
+    // be started and stopped along with it.
+    serverActionExecutor.stop();
   }
 
   /**
@@ -216,7 +223,7 @@ class ActionScheduler implements Runnable {
         return;
       }
       int i_stage = 0;
-      
+
       stages = filterParallelPerHostStages(stages);
 
       boolean exclusiveRequestIsGoing = false;
@@ -285,18 +292,7 @@ class ActionScheduler implements Runnable {
         //Schedule what we have so far
 
         for (ExecutionCommand cmd : commandsToSchedule) {
-          if (Role.valueOf(cmd.getRole()).equals(Role.AMBARI_SERVER_ACTION)) {
-            /**
-             * We don't forbid executing any stages in parallel with
-             * AMBARI_SERVER_ACTION. That  should be OK as AMBARI_SERVER_ACTION
-             * is not used as of now. The general motivation has been to update
-             * Request status when last task associated with the
-             * Request is finished.
-             */
-            executeServerAction(s, cmd);
-          } else {
             processHostRole(s, cmd, commandsToStart, commandsToUpdate);
-          }
         }
 
         LOG.debug("==> Commands to start: {}", commandsToStart.size());
@@ -347,7 +343,12 @@ class ActionScheduler implements Runnable {
 
         LOG.debug("==> Adding {} tasks to queue...", commandsToUpdate.size());
         for (ExecutionCommand cmd : commandsToUpdate) {
-          actionQueue.enqueue(cmd.getHostname(), cmd);
+          // Do not queue up server actions; however if we encounter one, wake up the ServerActionExecutor
+          if (Role.AMBARI_SERVER_ACTION.toString().equals(cmd.getRole())) {
+            serverActionExecutor.awake();
+          } else {
+            actionQueue.enqueue(cmd.getHostname(), cmd);
+          }
         }
         LOG.debug("==> Finished.");
 
@@ -403,32 +404,6 @@ class ActionScheduler implements Runnable {
     return true;
   }
 
-  /**
-   * Executes internal ambari-server action
-   */
-  private void executeServerAction(Stage s, ExecutionCommand cmd) {
-    try {
-      LOG.trace("Executing server action: request_id={}, stage_id={}, task_id={}",
-        s.getRequestId(), s.getStageId(), cmd.getTaskId());
-      long now = System.currentTimeMillis();
-      String hostName = cmd.getHostname();
-      String roleName = cmd.getRole();
-
-      s.setStartTime(hostName, roleName, now);
-      s.setLastAttemptTime(hostName, roleName, now);
-      s.incrementAttemptCount(hostName, roleName);
-      s.setHostRoleStatus(hostName, roleName, HostRoleStatus.QUEUED);
-      db.hostRoleScheduled(s, hostName, roleName);
-      String actionName = cmd.getRoleParams().get(ServerAction.ACTION_NAME);
-      this.serverActionManager.executeAction(actionName, cmd.getCommandParams());
-      reportServerActionSuccess(s, cmd);
-
-    } catch (AmbariException e) {
-      LOG.warn("Could not execute server action " + cmd.toString(), e);
-      reportServerActionFailure(s, cmd, e.getMessage());
-    }
-  }
-
   private boolean hasPreviousStageFailed(Stage stage) {
     boolean failed = false;
     long prevStageId = stage.getStageId() - 1;
@@ -477,26 +452,6 @@ class ActionScheduler implements Runnable {
     return failed;
   }
 
-  private void reportServerActionSuccess(Stage stage, ExecutionCommand cmd) {
-    CommandReport report = new CommandReport();
-    report.setStatus(HostRoleStatus.COMPLETED.toString());
-    report.setExitCode(0);
-    report.setStdOut("Server action succeeded");
-    report.setStdErr("");
-    db.updateHostRoleState(cmd.getHostname(), stage.getRequestId(), stage.getStageId(),
-            cmd.getRole(), report);
-  }
-
-  private void reportServerActionFailure(Stage stage, ExecutionCommand cmd, String message) {
-    CommandReport report = new CommandReport();
-    report.setStatus(HostRoleStatus.FAILED.toString());
-    report.setExitCode(1);
-    report.setStdOut("Server action failed");
-    report.setStdErr(message);
-    db.updateHostRoleState(cmd.getHostname(), stage.getRequestId(), stage.getStageId(),
-            cmd.getRole(), report);
-  }
-
   /**
    * @return Stats for the roles in the stage. It is used to determine whether stage
    * has succeeded or failed.
@@ -569,12 +524,12 @@ class ActionScheduler implements Runnable {
 
         // Check that service host component is not deleted
         if (hostDeleted) {
-          
+
           String message = String.format(
             "Host not found when trying to schedule an execution command. " +
             "The most probable reason for that is that host or host component " +
             "has been deleted recently. The command has been aborted and dequeued." +
-            "Execution command details: " + 
+            "Execution command details: " +
             "cmdId: %s; taskId: %s; roleCommand: %s",
             c.getCommandId(), c.getTaskId(), c.getRoleCommand());
           LOG.warn("Host {} has been detected as non-available. {}", host, message);
@@ -772,7 +727,7 @@ class ActionScheduler implements Runnable {
     }
 
     cmd.setClusterHostInfo(clusterHostInfo);
- 
+
     //Try to get commandParams from cache and merge them with command-level parameters
     Map<String, String> commandParams = commandParamsStageCache.getIfPresent(stagePk);
 
@@ -888,12 +843,16 @@ class ActionScheduler implements Runnable {
       LOG.error("Unknown status " + status.name());
     }
   }
-  
-  
+
+
   public void setTaskTimeoutAdjustment(boolean val) {
     this.taskTimeoutAdjustment = val;
   }
 
+  ServerActionExecutor getServerActionExecutor() {
+    return serverActionExecutor;
+  }
+
   static class RoleStats {
     int numInProgress;
     int numQueued = 0;

+ 32 - 7
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java

@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.actionmanager;
 
+import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -39,7 +40,7 @@ import org.apache.ambari.server.orm.entities.StageEntity;
 import org.apache.ambari.server.serveraction.ServerAction;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.ServiceComponentHostEvent;
-import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostServerActionEvent;
 import org.apache.ambari.server.utils.StageUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -302,14 +303,38 @@ public class Stage {
 
 
   /**
-   *  Creates server-side execution command. As of now, it seems to
-   *  be used only for server upgrade
+   * Creates server-side execution command.
+   * <p/>
+   * The action name for this command is expected to be the classname of a
+   * {@link org.apache.ambari.server.serveraction.ServerAction} implementation which will be
+   * instantiated and invoked as needed.
+   *
+   * @param actionName    a String declaring the action name (in the form of a classname) to execute
+   * @param role          the Role for this command
+   * @param command       the RoleCommand for this command
+   * @param clusterName   a String identifying the cluster on which to to execute this command
+   * @param event         a ServiceComponentHostServerActionEvent
+   * @param commandParams a Map of String to String data used to pass to the action - this may be
+   *                      empty or null if no data is relevant
+   * @param timeout       an Integer declaring the timeout for this action - if null, a default
+   *                      timeout will be used
    */
-  public synchronized void addServerActionCommand(String actionName, Role role,  RoleCommand command, String clusterName,
-      ServiceComponentHostUpgradeEvent event, String hostName) {
-    ExecutionCommandWrapper commandWrapper = addGenericExecutionCommand(clusterName, hostName, role, command, event);
+  public synchronized void addServerActionCommand(String actionName, Role role, RoleCommand command,
+                                                  String clusterName, ServiceComponentHostServerActionEvent event,
+                                                  @Nullable Map<String, String> commandParams,
+                                                  @Nullable Integer timeout) {
+    ExecutionCommandWrapper commandWrapper = addGenericExecutionCommand(clusterName, StageUtils.getHostName(), role, command, event);
     ExecutionCommand cmd = commandWrapper.getExecutionCommand();
-    
+
+    Map<String, String> cmdParams = new HashMap<String, String>();
+    if (commandParams != null) {
+      cmdParams.putAll(commandParams);
+    }
+    if (timeout != null) {
+      cmdParams.put(ExecutionCommand.KeyNames.COMMAND_TIMEOUT, NumberFormat.getIntegerInstance().format(timeout));
+    }
+    cmd.setCommandParams(cmdParams);
+
     Map<String, String> roleParams = new HashMap<String, String>();
     roleParams.put(ServerAction.ACTION_NAME, actionName);
     cmd.setRoleParams(roleParams);

+ 0 - 3
ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java

@@ -66,8 +66,6 @@ import org.apache.ambari.server.scheduler.ExecutionScheduler;
 import org.apache.ambari.server.scheduler.ExecutionSchedulerImpl;
 import org.apache.ambari.server.security.SecurityHelper;
 import org.apache.ambari.server.security.SecurityHelperImpl;
-import org.apache.ambari.server.serveraction.ServerActionManager;
-import org.apache.ambari.server.serveraction.ServerActionManagerImpl;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
@@ -238,7 +236,6 @@ public class ControllerModule extends AbstractModule {
     bind(AmbariManagementController.class)
         .to(AmbariManagementControllerImpl.class);
     bind(AbstractRootServiceResponseFactory.class).to(RootServiceResponseFactory.class);
-    bind(ServerActionManager.class).to(ServerActionManagerImpl.class);
     bind(ExecutionScheduler.class).to(ExecutionSchedulerImpl.class);
     bind(DBAccessor.class).to(DBAccessorImpl.class);
     bind(ViewInstanceHandlerList.class).to(AmbariHandlerList.class);

+ 11 - 0
ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java

@@ -126,6 +126,17 @@ public class HostRoleCommandDAO {
     return daoUtils.selectList(query, requestIds, taskIds);
   }
 
+  @RequiresSession
+  public List<Long> findTaskIdsByHostRoleAndStatus(String hostname, String role, HostRoleStatus status) {
+    TypedQuery<Long> query = entityManagerProvider.get().createQuery(
+        "SELECT DISTINCT task.taskId FROM HostRoleCommandEntity task " +
+            "WHERE task.hostName=?1 AND task.role=?2 AND task.status=?3 " +
+            "ORDER BY task.taskId", Long.class
+    );
+
+    return daoUtils.selectList(query, hostname, role, status);
+  }
+
   @RequiresSession
   public List<HostRoleCommandEntity> findSortedCommandsByStageAndHost(StageEntity stageEntity, HostEntity hostEntity) {
     TypedQuery<HostRoleCommandEntity> query = entityManagerProvider.get().createQuery("SELECT hostRoleCommand " +

+ 138 - 0
ambari-server/src/main/java/org/apache/ambari/server/serveraction/AbstractServerAction.java

@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.serveraction;
+
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.utils.StageUtils;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * AbstractServerActionImpl is an abstract implementation of a ServerAction.
+ * <p/>
+ * This abstract implementation provides common facilities for all ServerActions, such as
+ * maintaining the ExecutionCommand and HostRoleCommand properties. It also provides a convenient
+ * way to generate CommandReports for reporting status.
+ */
+public abstract class AbstractServerAction implements ServerAction {
+  /**
+   * The ExecutionCommand containing data related to this ServerAction implementation
+   */
+  private ExecutionCommand executionCommand = null;
+
+  /**
+   * The HostRoleCommand containing data related to this ServerAction implementation
+   */
+  private HostRoleCommand hostRoleCommand = null;
+
+  @Override
+  public ExecutionCommand getExecutionCommand() {
+    return this.executionCommand;
+  }
+
+  @Override
+  public void setExecutionCommand(ExecutionCommand executionCommand) {
+    this.executionCommand = executionCommand;
+  }
+
+  @Override
+  public HostRoleCommand getHostRoleCommand() {
+    return this.hostRoleCommand;
+  }
+
+  @Override
+  public void setHostRoleCommand(HostRoleCommand hostRoleCommand) {
+    this.hostRoleCommand = hostRoleCommand;
+  }
+
+  /**
+   * Creates a CommandReport used to report back to Ambari the status of this ServerAction.
+   *
+   * @param exitCode      an integer value declaring the exit code for this action - 0 typically
+   *                      indicates success.
+   * @param status        a HostRoleStatus indicating the status of this action
+   * @param structuredOut a String containing the (typically) JSON-formatted data representing the
+   *                      output from this action (this data is stored in the database, along with
+   *                      the command status)
+   * @param stdout        A string containing the data from the standard out stream (this data is stored in
+   *                      the database, along with the command status)
+   * @param stderr        A string containing the data from the standard error stream (this data is stored
+   *                      in the database, along with the command status)
+   * @return the generated CommandReport, or null if the HostRoleCommand or ExecutionCommand
+   * properties are missing
+   */
+  protected CommandReport createCommandReport(int exitCode, HostRoleStatus status, String structuredOut,
+                                              String stdout, String stderr) {
+    CommandReport report = null;
+
+    if (hostRoleCommand != null) {
+      if (executionCommand == null) {
+        ExecutionCommandWrapper wrapper = hostRoleCommand.getExecutionCommandWrapper();
+
+        if (wrapper != null) {
+          executionCommand = wrapper.getExecutionCommand();
+        }
+      }
+
+      if (executionCommand != null) {
+        RoleCommand roleCommand = executionCommand.getRoleCommand();
+
+        report = new CommandReport();
+
+        report.setActionId(StageUtils.getActionId(hostRoleCommand.getRequestId(), hostRoleCommand.getStageId()));
+        report.setClusterName(executionCommand.getClusterName());
+        report.setConfigurationTags(executionCommand.getConfigurationTags());
+        report.setRole(executionCommand.getRole());
+        report.setRoleCommand((roleCommand == null) ? null : roleCommand.toString());
+        report.setServiceName(executionCommand.getServiceName());
+        report.setTaskId(executionCommand.getTaskId());
+
+        report.setStructuredOut(structuredOut);
+        report.setStdErr((stderr == null) ? "" : stderr);
+        report.setStdOut((stdout == null) ? "" : stdout);
+        report.setStatus((status == null) ? null : status.toString());
+        report.setExitCode(exitCode);
+      }
+    }
+
+    return report;
+  }
+
+  /**
+   * Returns the command parameters value from the ExecutionCommand
+   * <p/>
+   * The returned map should be assumed to be read-only.
+   *
+   * @return the (assumed read-only) command parameters value from the ExecutionCommand
+   */
+  protected Map<String, String> getCommandParameters() {
+    if (executionCommand == null) {
+      return Collections.emptyMap();
+    } else {
+      return executionCommand.getCommandParams();
+    }
+  }
+
+}

+ 58 - 14
ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerAction.java

@@ -18,23 +18,67 @@
 
 package org.apache.ambari.server.serveraction;
 
-public class ServerAction {
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.agent.ExecutionCommand;
+
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * ServerAction is an interface to be implemented by all server-based actions/tasks.
+ */
+public interface ServerAction {
 
   public static final String ACTION_NAME = "ACTION_NAME";
 
+
+  /**
+   * Gets the ExecutionCommand property of this ServerAction.
+   *
+   * @return the ExecutionCommand property of this ServerAction
+   */
+  ExecutionCommand getExecutionCommand();
+
+  /**
+   * Sets the ExecutionCommand property of this ServerAction.
+   * <p/>
+   * This property is expected to be set by the creator of this ServerAction before calling execute.
+   *
+   * @param command the ExecutionCommand data to set
+   */
+  void setExecutionCommand(ExecutionCommand command);
+
+
+  /**
+   * Gets the HostRoleCommand property of this ServerAction.
+   *
+   * @return the HostRoleCommand property of this ServerAction
+   */
+  HostRoleCommand getHostRoleCommand();
+
+  /**
+   * Sets the HostRoleCommand property of this ServerAction.
+   * <p/>
+   * This property is expected to be set by the creator of this ServerAction before calling execute.
+   *
+   * @param hostRoleCommand the HostRoleCommand data to set
+   */
+  void setHostRoleCommand(HostRoleCommand hostRoleCommand);
+
   /**
-   * The commands supported by the server. A command is a named alias to the
-   * action implementation at the server
+   * Executes this ServerAction
+   * <p/>
+   * This is typically called by the ServerActionExecutor in it's own thread, but there is no
+   * guarantee that this is the case.  It is expected that the ExecutionCommand and HostRoleCommand
+   * properties are set before calling this method.
+   *
+   * @param requestSharedDataContext a Map to be used a shared data among all ServerActions related
+   *                                 to a given request
+   * @return a CommandReport declaring the status of the task
+   * @throws AmbariException
+   * @throws InterruptedException
    */
-  public static class Command {
-    /**
-     * Finalize the upgrade request
-     */
-    public static final String FINALIZE_UPGRADE = "FINALIZE_UPGRADE";
-  }
-
-  public static class PayloadName {
-    public final static String CURRENT_STACK_VERSION = "current_stack_version";
-    public final static String CLUSTER_NAME = "cluster_name";
-  }
+  CommandReport execute(ConcurrentMap<String, Object> requestSharedDataContext)
+      throws AmbariException, InterruptedException;
 }

+ 525 - 0
ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionExecutor.java

@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.serveraction;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.actionmanager.*;
+import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.utils.StageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Server Action Executor used to execute server-side actions (or tasks)
+ * <p/>
+ * The ServerActionExecutor executes in its own thread, polling for AMBARI_SERVER_ACTION
+ * HostRoleCommands queued for execution.  It is expected that this thread is managed by the
+ * ActionScheduler such that it is started when the ActionScheduler is started and stopped when the
+ * ActionScheduler is stopped.
+ */
+public class ServerActionExecutor {
+
+  private final static Logger LOG = LoggerFactory.getLogger(ServerActionExecutor.class);
+  private final static Long EXECUTION_TIMEOUT_MS = 1000L * 60 * 5;
+  private final static Long POLLING_TIMEOUT_MS = 1000L * 5;
+
+  /**
+   * Maps request IDs to "blackboards" of shared data.
+   * <p/>
+   * This map is not synchronized, so any access to it should synchronize on
+   * requestSharedDataMap object
+   */
+  private final Map<Long, ConcurrentMap<String, Object>> requestSharedDataMap =
+      new HashMap<Long, ConcurrentMap<String, Object>>();
+
+  /**
+   * The hostname of the (Ambari) server.
+   * <p/>
+   * This hostname is cached so that cycles are spent querying for it more than once.
+   */
+  private final String serverHostName;
+
+  /**
+   * Database accessor to query and update the database of action commands.
+   */
+  private final ActionDBAccessor db;
+
+  /**
+   * Internal locking object used to manage access to activeAwakeRequest.
+   */
+  private final Object wakeupSyncObject = new Object();
+
+  /**
+   * Timeout (in milliseconds) used to throttle polling of database for new action commands.
+   */
+  private final long sleepTimeMS;
+
+  /**
+   * Flag used to help keep thing moving in the event an "awake" request was encountered while busy
+   * handing an action.
+   */
+  private boolean activeAwakeRequest = false;
+
+  /**
+   * A reference to the Thread handling the work for this ServerActionExecutor
+   */
+  private Thread executorThread = null;
+
+  /**
+   * Creates a new ServerActionExecutor
+   *
+   * @param db          the ActionDBAccessor to use to read and update tasks
+   * @param sleepTimeMS the time (in milliseconds) to wait between polling the database for more tasks
+   */
+  public ServerActionExecutor(ActionDBAccessor db, long sleepTimeMS) {
+    this.serverHostName = StageUtils.getHostName();
+    this.db = db;
+    this.sleepTimeMS = (sleepTimeMS < 1) ? POLLING_TIMEOUT_MS : sleepTimeMS;
+  }
+
+  /**
+   * Starts this ServerActionExecutor's main thread.
+   */
+  public void start() {
+    LOG.info("Starting Server Action Executor thread...");
+    executorThread = new Thread(new Runnable() {
+
+      @Override
+      public void run() {
+        while (!Thread.interrupted()) {
+          try {
+            synchronized (wakeupSyncObject) {
+              if (!activeAwakeRequest) {
+                wakeupSyncObject.wait(sleepTimeMS);
+              }
+              activeAwakeRequest = false;
+            }
+
+            doWork();
+          } catch (InterruptedException e) {
+            LOG.warn("Server Action Executor thread interrupted, starting to shutdown...");
+            break;
+          }
+        }
+
+        LOG.info("Server Action Executor thread shutting down...");
+      }
+    }, "Server Action Executor");
+    executorThread.start();
+
+    if (executorThread.isAlive()) {
+      LOG.info("Server Action Executor thread started.");
+    }
+  }
+
+  /**
+   * Attempts to stop this ServerActionExecutor's main thread.
+   */
+  public void stop() {
+    LOG.info("Stopping Server Action Executor thread...");
+
+    if (executorThread != null) {
+      executorThread.interrupt();
+
+      // Wait for about 60 seconds for the thread to stop
+      for (int i = 0; i < 120; i++) {
+        try {
+          executorThread.join(500);
+        } catch (InterruptedException e) {
+          // Ignore this...
+        }
+
+        if (!executorThread.isAlive()) {
+          break;
+        }
+      }
+
+      if (!executorThread.isAlive()) {
+        executorThread = null;
+      }
+    }
+
+    if (executorThread == null) {
+      LOG.info("Server Action Executor thread stopped.");
+    } else {
+      LOG.warn("Server Action Executor thread hasn't stopped, giving up waiting.");
+    }
+  }
+
+  /**
+   * Attempts to force this ServerActionExecutor to wake up and do work.
+   * <p/>
+   * Should be called from another thread when we want scheduler to
+   * make a run ASAP (for example, to process desired configs of SCHs).
+   * The method is guaranteed to return quickly.
+   */
+  public void awake() {
+    synchronized (wakeupSyncObject) {
+      activeAwakeRequest = true;
+      wakeupSyncObject.notify();
+    }
+  }
+
+  /**
+   * Returns a Map to be used to share data among server actions within a given request context.
+   *
+   * @param requestId a long identifying the id of the relevant request
+   * @return a ConcurrentMap of "shared" data
+   */
+  private ConcurrentMap<String, Object> getRequestSharedDataContext(long requestId) {
+    synchronized (requestSharedDataMap) {
+      ConcurrentMap<String, Object> map = requestSharedDataMap.get(requestId);
+
+      if (map == null) {
+        map = new ConcurrentHashMap<String, Object>();
+        requestSharedDataMap.put(requestId, map);
+      }
+
+      return map;
+    }
+  }
+
+  /**
+   * Cleans up orphaned shared data Maps due to completed or failed request contexts.
+   */
+  private void cleanRequestShareDataContexts() {
+    // Clean out any orphaned request shared data contexts
+    for (RequestStatus status : EnumSet.of(RequestStatus.FAILED, RequestStatus.COMPLETED)) {
+      List<Long> requests = db.getRequestsByStatus(status, 100, true);
+
+      if (requests != null) {
+        synchronized (requestSharedDataMap) {
+          for (Long requestId : requests) {
+            requestSharedDataMap.remove(requestId);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * A helper method to create CommandReports indicating the action/task is in progress
+   *
+   * @return a new CommandReport
+   */
+  private CommandReport createInProgressReport() {
+    CommandReport commandReport = new CommandReport();
+    commandReport.setStatus(HostRoleStatus.IN_PROGRESS.toString());
+    commandReport.setStdErr("");
+    commandReport.setStdOut("");
+    return commandReport;
+  }
+
+  /**
+   * A helper method to create CommandReports indicating the action/task had timed out
+   *
+   * @return a new CommandReport
+   */
+  private CommandReport createTimedOutReport() {
+    CommandReport commandReport = new CommandReport();
+    commandReport.setStatus(HostRoleStatus.TIMEDOUT.toString());
+    commandReport.setStdErr("");
+    commandReport.setStdOut("");
+    return commandReport;
+  }
+
+  /**
+   * A helper method to create CommandReports indicating the action/task has had an error
+   *
+   * @param message a String containing the error message to report
+   * @return a new CommandReport
+   */
+  private CommandReport createErrorReport(String message) {
+    CommandReport commandReport = new CommandReport();
+    commandReport.setStatus(HostRoleStatus.FAILED.toString());
+    commandReport.setExitCode(1);
+    commandReport.setStdOut("Server action failed");
+    commandReport.setStdErr(message);
+    return commandReport;
+  }
+
+  /**
+   * Stores the status of the task/action
+   * <p/>
+   * If the command report is not specified (null), an error report will be created.
+   *
+   * @param hostRoleCommand  the HostRoleCommand for the relevant task
+   * @param executionCommand the ExecutionCommand for the relevant task
+   * @param commandReport    the CommandReport to store
+   */
+  private void updateHostRoleState(HostRoleCommand hostRoleCommand, ExecutionCommand executionCommand,
+                                   CommandReport commandReport) {
+    if (commandReport == null) {
+      commandReport = createErrorReport("Unknown error condition");
+    }
+
+    db.updateHostRoleState(executionCommand.getHostname(), hostRoleCommand.getRequestId(),
+        hostRoleCommand.getStageId(), executionCommand.getRole(), commandReport);
+  }
+
+  /**
+   * Determine what the timeout for this action/task should be.
+   * <p/>
+   * If the timeout value is not set in the command parameter map (under the key
+   * ExecutionCommand.KeyNames.COMMAND_TIMEOUT or "command_timeout", the default timeout value will
+   * be used.  It is expected that the timeout value stored in the command parameter map (if any) is
+   * in seconds.
+   *
+   * @param executionCommand the ExecutionCommand for the relevant task
+   * @return a long declaring the action/task's timeout
+   */
+  private long determineTimeout(ExecutionCommand executionCommand) {
+    Map<String, String> params = executionCommand.getCommandParams();
+    String paramsTimeout = (params == null) ? null : params.get(ExecutionCommand.KeyNames.COMMAND_TIMEOUT);
+    Long timeout;
+
+    try {
+      timeout = (paramsTimeout == null)
+          ? null
+          : (Long.parseLong(paramsTimeout) * 1000); // Convert seconds to milliseconds
+    } catch (NumberFormatException e) {
+      timeout = null;
+    }
+
+    return (timeout == null)
+        ? EXECUTION_TIMEOUT_MS
+        : ((timeout < 0) ? 0 : timeout);
+  }
+
+  /**
+   * Execute the logic to handle each task in the queue in the order in which it was queued.
+   * <p/>
+   * A single task is executed at one time, allowing for a specified (ExecutionCommand.KeyNames.COMMAND_TIMEOUT)
+   * or the default timeout for it to complete before considering the task timed out.
+   *
+   * @throws InterruptedException
+   */
+  public void doWork() throws InterruptedException {
+    List<HostRoleCommand> tasks = db.getTasksByHostRoleAndStatus(serverHostName,
+        Role.AMBARI_SERVER_ACTION.toString(), HostRoleStatus.QUEUED);
+
+    if ((tasks != null) && !tasks.isEmpty()) {
+      for (HostRoleCommand task : tasks) {
+        Long taskId = task.getTaskId();
+
+        LOG.debug("Processing task #{}", taskId);
+
+        if (task.getStatus() == HostRoleStatus.QUEUED) {
+          ExecutionCommandWrapper executionWrapper = task.getExecutionCommandWrapper();
+
+          if (executionWrapper != null) {
+            ExecutionCommand executionCommand = executionWrapper.getExecutionCommand();
+
+            if (executionCommand != null) {
+              // For now, execute only one task at a time. This may change in the future in the
+              // event it is discovered that this is a bottleneck. Since this implementation may
+              // change, it should be noted from outside of this class, that there is no expectation
+              // that tasks will be processed in order or serially.
+              Worker worker = new Worker(task, executionCommand);
+              Thread workerThread = new Thread(worker, String.format("Server Action Executor Worker %s", taskId));
+              Long timeout = determineTimeout(executionCommand);
+
+              updateHostRoleState(task, executionCommand, createInProgressReport());
+
+              LOG.debug("Starting Server Action Executor Worker thread for task #{}.", taskId);
+              workerThread.start();
+
+              try {
+                workerThread.join(timeout);
+              } catch (InterruptedException e) {
+                // Make sure the workerThread is interrupted as well.
+                workerThread.interrupt();
+                throw e;
+              }
+
+              if (workerThread.isAlive()) {
+                LOG.debug("Server Action Executor Worker thread for task #{} timed out - it failed to complete within {} ms.",
+                    taskId, timeout);
+                workerThread.interrupt();
+                updateHostRoleState(task, executionCommand, createTimedOutReport());
+              } else {
+                LOG.debug("Server Action Executor Worker thread for task #{} exited on its own.", taskId);
+                updateHostRoleState(task, executionCommand, worker.getCommandReport());
+              }
+            } else {
+              LOG.warn("Task #{} failed to produce an ExecutionCommand, skipping.", taskId);
+            }
+          } else {
+            LOG.warn("Task #{} failed to produce an ExecutionCommandWrapper, skipping.", taskId);
+          }
+        } else {
+          LOG.warn("Queued task #{} is expected to have a status of {} but has a status of {}, skipping.",
+              taskId, HostRoleStatus.QUEUED, task.getStatus());
+        }
+      }
+    }
+
+    cleanRequestShareDataContexts();
+  }
+
+  /**
+   * Internal class to execute a unit of work in its own thread
+   */
+  private class Worker implements Runnable {
+    /**
+     * The task id of the relevant task
+     */
+    private final Long taskId;
+
+    /**
+     * The HostRoleCommand data used by this Worker to execute the task
+     */
+    private final HostRoleCommand hostRoleCommand;
+
+    /**
+     * The ExecutionCommand data used by this Worker to execute the task
+     */
+    private final ExecutionCommand executionCommand;
+
+    /**
+     * The resulting CommandReport used by the caller to update the status of the relevant task
+     */
+    private CommandReport commandReport = null;
+
+    @Override
+    public void run() {
+      try {
+        LOG.debug("Executing task #{}", taskId);
+
+        commandReport = execute(hostRoleCommand, executionCommand);
+
+        LOG.debug("Task #{} completed execution with status of {}",
+            taskId, (commandReport == null) ? "UNKNOWN" : commandReport.getStatus());
+      } catch (Throwable t) {
+        LOG.warn("Task #{} failed to complete execution due to thrown exception: {}:{}",
+            taskId, t.getClass().getName(), t.getLocalizedMessage());
+
+        commandReport = createErrorReport(t.getLocalizedMessage());
+      }
+    }
+
+    /**
+     * Returns the resulting CommandReport
+     *
+     * @return a CommandReport
+     */
+    public CommandReport getCommandReport() {
+      return commandReport;
+    }
+
+    /**
+     * Attempts to execute the task specified using data from the supplied HostRoleCommand and
+     * ExecutionCommand.
+     * <p/>
+     * Retrieves the role parameters from the supplied ExecutionCommand and queries it for the
+     * "ACTON_NAME" property.  The returned String is expected to be the classname of a ServerAction
+     * implementation.  If so, an instance of the implementation class is created and executed
+     * yielding a CommandReport to (eventually) return back to the parent thread.
+     *
+     * @param hostRoleCommand  The HostRoleCommand the HostRoleCommand for the relevant task
+     * @param executionCommand the ExecutionCommand for the relevant task
+     * @return the resulting CommandReport
+     * @throws AmbariException
+     * @throws InterruptedException
+     */
+    private CommandReport execute(HostRoleCommand hostRoleCommand, ExecutionCommand executionCommand)
+        throws AmbariException, InterruptedException {
+
+      if (hostRoleCommand == null) {
+        throw new AmbariException("Missing HostRoleCommand data");
+      } else if (executionCommand == null) {
+        throw new AmbariException("Missing ExecutionCommand data");
+      } else {
+        Map<String, String> roleParams = executionCommand.getRoleParams();
+
+        if (roleParams == null) {
+          throw new AmbariException("Missing RoleParams data");
+        } else {
+          String actionClassname = roleParams.get(ServerAction.ACTION_NAME);
+
+          if (actionClassname == null) {
+            throw new AmbariException("Missing action classname for server action");
+          } else {
+            ServerAction action = createServerAction(actionClassname);
+
+            if (action == null) {
+              throw new AmbariException("Failed to create server action: " + actionClassname);
+            } else {
+              // Set properties on the action:
+              action.setExecutionCommand(executionCommand);
+              action.setHostRoleCommand(hostRoleCommand);
+
+              return action.execute(getRequestSharedDataContext(hostRoleCommand.getRequestId()));
+            }
+          }
+        }
+      }
+    }
+
+    /**
+     * Attempts to create an instance of the ServerAction class implementation specified in
+     * classname.
+     *
+     * @param classname a String declaring the classname of the ServerAction class to instantiate
+     * @return the instantiated ServerAction implementation
+     * @throws AmbariException
+     */
+    private ServerAction createServerAction(String classname) throws AmbariException {
+      try {
+        Class<?> actionClass = Class.forName(classname);
+
+        if (actionClass == null) {
+          throw new AmbariException("Unable to load server action class: " + classname);
+        } else {
+          Class<? extends ServerAction> serverActionClass = actionClass.asSubclass(ServerAction.class);
+
+          if (serverActionClass == null) {
+            throw new AmbariException("Unable to execute server action class, invalid type: " + classname);
+          } else {
+            return serverActionClass.newInstance();
+          }
+        }
+      } catch (ClassNotFoundException e) {
+        throw new AmbariException("Unable to load server action class: " + classname, e);
+      } catch (InstantiationException e) {
+        throw new AmbariException("Unable to create an instance of the server action class: " + classname, e);
+      } catch (IllegalAccessException e) {
+        throw new AmbariException("Unable to create an instance of the server action class: " + classname, e);
+      }
+    }
+
+    /**
+     * Constructs a new Worker used to execute a task
+     *
+     * @param hostRoleCommand  the HostRoleCommand for the relevant task
+     * @param executionCommand the ExecutionCommand for the relevant task
+     */
+    private Worker(HostRoleCommand hostRoleCommand, ExecutionCommand executionCommand) {
+      this.taskId = hostRoleCommand.getTaskId();
+      this.hostRoleCommand = hostRoleCommand;
+      this.executionCommand = executionCommand;
+    }
+  }
+}

+ 0 - 32
ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManager.java

@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.server.serveraction;
-
-import org.apache.ambari.server.AmbariException;
-
-import java.util.Map;
-
-/**
- * Server action manager interface.
- */
-public interface ServerActionManager {
-
-  public void executeAction(String actionName, Map<String, String> payload)
-      throws AmbariException;
-}

+ 0 - 74
ambari-server/src/main/java/org/apache/ambari/server/serveraction/ServerActionManagerImpl.java

@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.server.serveraction;
-
-import com.google.inject.Singleton;
-import com.google.inject.Inject;
-import org.apache.ambari.server.AmbariException;
-import org.apache.ambari.server.state.Cluster;
-import org.apache.ambari.server.state.Clusters;
-import org.apache.ambari.server.state.StackId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-/**
- * Server action manager implementation.
- */
-@Singleton
-public class ServerActionManagerImpl implements ServerActionManager {
-
-  private final static Logger LOG =
-      LoggerFactory.getLogger(ServerActionManagerImpl.class);
-
-  private Clusters clusters;
-
-  @Inject
-  public ServerActionManagerImpl(Clusters clusters) {
-    this.clusters = clusters;
-  }
-
-  @Override
-  public void executeAction(String actionName, Map<String, String> payload)
-      throws AmbariException {
-    LOG.info("Executing server action : "
-        + actionName + " with payload "
-        + payload);
-
-    if (actionName.equals(ServerAction.Command.FINALIZE_UPGRADE)) {
-      updateClusterStackVersion(payload);
-    } else {
-      throw new AmbariException("Unsupported action " + actionName);
-    }
-  }
-
-  private void updateClusterStackVersion(Map<String, String> payload) throws AmbariException {
-    if (payload == null
-        || !payload.containsKey(ServerAction.PayloadName.CLUSTER_NAME)
-        || !payload.containsKey(ServerAction.PayloadName.CURRENT_STACK_VERSION)) {
-      throw new AmbariException("Invalid payload.");
-    }
-
-    StackId currentStackId = new StackId(payload.get(ServerAction.PayloadName.CURRENT_STACK_VERSION));
-    final Cluster cluster = clusters.getCluster(payload.get(ServerAction.PayloadName.CLUSTER_NAME));
-    cluster.setCurrentStackVersion(currentStackId);
-    cluster.refresh();
-  }
-}

+ 2 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEvent.java

@@ -122,6 +122,8 @@ public abstract class ServiceComponentHostEvent
         return new ServiceComponentHostDisableEvent(serviceComponentName, hostName, opTimestamp);
       case HOST_SVCCOMP_RESTORE:
         return new ServiceComponentHostRestoreEvent(serviceComponentName, hostName, opTimestamp);
+      case HOST_SVCCOMP_SERVER_ACTION:
+        return new ServiceComponentHostServerActionEvent(serviceComponentName, hostName, opTimestamp);
     }
     return null;
   }

+ 5 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/ServiceComponentHostEventType.java

@@ -74,6 +74,10 @@ public enum ServiceComponentHostEventType {
   /**
    * Recovering host component from disable state
    */
-  HOST_SVCCOMP_RESTORE
+  HOST_SVCCOMP_RESTORE,
+  /**
+   * Triggering a server-side action
+   */
+  HOST_SVCCOMP_SERVER_ACTION
 
 }

+ 76 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostServerActionEvent.java

@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.state.svccomphost;
+
+import org.apache.ambari.server.state.ServiceComponentHostEvent;
+import org.apache.ambari.server.state.ServiceComponentHostEventType;
+
+/**
+ * Base class for all events that represent server-side actions.
+ */
+public class ServiceComponentHostServerActionEvent extends
+    ServiceComponentHostEvent {
+
+  /**
+   * Constructs a new ServiceComponentHostServerActionEvent.
+   * <p/>
+   * This method is expected to be called by ether a ServiceComponentHostServerActionEvent or a
+   * class that extends it.
+   *
+   * @param type                 the ServiceComponentHostEventType - expected to be
+   *                             ServiceComponentHostEventType.HOST_SVCCOMP_SERVER_ACTION
+   * @param serviceComponentName a String declaring the component for which this action is to be
+   *                             routed - expected to be "AMBARI_SERVER"
+   * @param hostName             a String declaring the host on which the action should be executed -
+   *                             expected to be the hostname of the Ambari server
+   * @param opTimestamp          the time in which this event was created
+   * @param stackId              the relevant stackid
+   */
+  protected ServiceComponentHostServerActionEvent(ServiceComponentHostEventType type,
+                                                  String serviceComponentName, String hostName,
+                                                  long opTimestamp, String stackId) {
+    super(type, serviceComponentName, hostName, opTimestamp, stackId);
+  }
+
+  /**
+   * Constructs a new ServiceComponentHostServerActionEvent where the component name is set to
+   * "AMBARI_SERVER" and the type is set to ServiceComponentHostEventType.HOST_SVCCOMP_SERVER_ACTION.
+   *
+   * @param hostName    a String declaring the host on which the action should be executed -
+   *                    expected to be the hostname of the Ambari server
+   * @param opTimestamp the time in which this event was created
+   */
+  public ServiceComponentHostServerActionEvent(String hostName, long opTimestamp) {
+    this("AMBARI_SERVER", hostName, opTimestamp);
+  }
+
+  /**
+   * Constructs a new ServiceComponentHostServerActionEvent
+   *
+   * @param serviceComponentName a String declaring the name of component
+   * @param hostName             a String declaring the host on which the action should be executed -
+   *                             expected to be the hostname of the Ambari server
+   * @param opTimestamp          the time in which this event was created
+   */
+  public ServiceComponentHostServerActionEvent(String serviceComponentName, String hostName,
+                                               long opTimestamp) {
+    this(ServiceComponentHostEventType.HOST_SVCCOMP_SERVER_ACTION, serviceComponentName, hostName,
+        opTimestamp, "");
+  }
+}

+ 1 - 2
ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostUpgradeEvent.java

@@ -18,11 +18,10 @@
 
 package org.apache.ambari.server.state.svccomphost;
 
-import org.apache.ambari.server.state.ServiceComponentHostEvent;
 import org.apache.ambari.server.state.ServiceComponentHostEventType;
 
 public class ServiceComponentHostUpgradeEvent extends
-    ServiceComponentHostEvent {
+    ServiceComponentHostServerActionEvent {
 
 
   public ServiceComponentHostUpgradeEvent(String serviceComponentName,

+ 75 - 5
ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionDBAccessorImpl.java

@@ -43,6 +43,7 @@ import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
 import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
 import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
+import org.apache.ambari.server.serveraction.MockServerAction;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent;
 import org.apache.ambari.server.utils.StageUtils;
@@ -52,6 +53,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import static org.junit.Assert.assertEquals;
@@ -67,6 +69,10 @@ public class TestActionDBAccessorImpl {
   private String hostName = "host1";
   private String clusterName = "cluster1";
   private String actionName = "validate_kerberos";
+
+  private String serverHostName = StageUtils.getHostName(); // "_localhost_";
+  private String serverActionName = MockServerAction.class.getName();
+
   private Injector injector;
   ActionDBAccessor db;
   ActionManager am;
@@ -85,13 +91,18 @@ public class TestActionDBAccessorImpl {
       .with(new TestActionDBAccessorModule()));
     injector.getInstance(GuiceJpaInitializer.class);
     injector.injectMembers(this);
+
+    // Add this host's name since it is needed for server-side actions.
+    clusters.addHost(serverHostName);
+    clusters.getHost(serverHostName).persist();
+
     clusters.addHost(hostName);
     clusters.getHost(hostName).persist();
     clusters.addCluster(clusterName);
     db = injector.getInstance(ActionDBAccessorImpl.class);
 
     am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db,
-        new HostsMap((String) null), null, injector.getInstance(UnitOfWork.class),
+        new HostsMap((String) null), injector.getInstance(UnitOfWork.class),
 		injector.getInstance(RequestFactory.class), null);
   }
 
@@ -157,7 +168,7 @@ public class TestActionDBAccessorImpl {
             "(command report status should be ignored)",
             HostRoleStatus.ABORTED,s.getHostRoleStatus(hostname, "HBASE_MASTER"));
   }
-  
+
   @Test
   public void testGetStagesInProgress() throws AmbariException {
     String hostname = "host1";
@@ -168,7 +179,7 @@ public class TestActionDBAccessorImpl {
     db.persistActions(request);
     assertEquals(2, stages.size());
   }
-  
+
   @Test
   public void testGetStagesInProgressWithFailures() throws AmbariException {
     String hostname = "host1";
@@ -267,6 +278,46 @@ public class TestActionDBAccessorImpl {
     assertEquals("Concurrent update failed", HostRoleStatus.COMPLETED, entities.get(0).getStatus());
   }
 
+  @Test
+  public void testServerActionScheduled() throws InterruptedException, AmbariException {
+    populateActionDBWithServerAction(db, serverHostName, requestId, stageId);
+
+    final String roleName = Role.AMBARI_SERVER_ACTION.toString();
+    Stage stage = db.getStage(StageUtils.getActionId(requestId, stageId));
+    assertEquals(HostRoleStatus.PENDING, stage.getHostRoleStatus(serverHostName, roleName));
+    List<HostRoleCommandEntity> entities =
+        hostRoleCommandDAO.findByHostRole(serverHostName, requestId, stageId, roleName);
+
+    assertEquals(HostRoleStatus.PENDING, entities.get(0).getStatus());
+    stage.setHostRoleStatus(serverHostName, roleName, HostRoleStatus.QUEUED);
+
+    entities = hostRoleCommandDAO.findByHostRole(serverHostName, requestId, stageId, roleName);
+    assertEquals(HostRoleStatus.QUEUED, stage.getHostRoleStatus(serverHostName, roleName));
+    assertEquals(HostRoleStatus.PENDING, entities.get(0).getStatus());
+
+    db.hostRoleScheduled(stage, serverHostName, roleName);
+
+    entities = hostRoleCommandDAO.findByHostRole(
+        serverHostName, requestId, stageId, roleName);
+    assertEquals(HostRoleStatus.QUEUED, entities.get(0).getStatus());
+
+
+    Thread thread = new Thread() {
+      @Override
+      public void run() {
+        Stage stage1 = db.getStage("23-31");
+        stage1.setHostRoleStatus(serverHostName, roleName, HostRoleStatus.COMPLETED);
+        db.hostRoleScheduled(stage1, serverHostName, roleName);
+      }
+    };
+
+    thread.start();
+    thread.join();
+
+    entities = hostRoleCommandDAO.findByHostRole(serverHostName, requestId, stageId, roleName);
+    assertEquals("Concurrent update failed", HostRoleStatus.COMPLETED, entities.get(0).getStatus());
+  }
+
   @Test
   public void testUpdateHostRole() throws Exception {
     populateActionDB(db, hostName, requestId, stageId);
@@ -310,7 +361,7 @@ public class TestActionDBAccessorImpl {
     populateActionDB(db, hostName, requestId + 1, stageId);
     List<Long> requestIdsResult =
       db.getRequestsByStatus(null, BaseRequest.DEFAULT_PAGE_SIZE, false);
-    
+
     assertNotNull("List of request IDs is null", requestIdsResult);
     assertEquals("Request IDs not matches", requestIds, requestIdsResult);
   }
@@ -520,7 +571,26 @@ public class TestActionDBAccessorImpl {
     List<RequestResourceFilter> resourceFilters = new
       ArrayList<RequestResourceFilter>() {{ add(resourceFilter); }};
     ExecuteActionRequest executeActionRequest = new ExecuteActionRequest
-      ("cluster1", null, actionName, resourceFilters, null, null, false);
+        ("cluster1", null, actionName, resourceFilters, null, null, false);
+    Request request = new Request(stages, clusters);
+    db.persistActions(request);
+  }
+
+  private void populateActionDBWithServerAction(ActionDBAccessor db, String hostname,
+                                                long requestId, long stageId) throws AmbariException {
+    Stage s = new Stage(requestId, "/a/b", "cluster1", 1L, "action db accessor test",
+        "", "commandParamsStage", "hostParamsStage");
+    s.setStageId(stageId);
+    s.addServerActionCommand(serverActionName, Role.AMBARI_SERVER_ACTION, RoleCommand.ACTIONEXECUTE, clusterName, null, null, 300);
+    List<Stage> stages = new ArrayList<Stage>();
+    stages.add(s);
+    final RequestResourceFilter resourceFilter = new RequestResourceFilter("AMBARI", "SERVER", Arrays.asList(hostname));
+    List<RequestResourceFilter> resourceFilters = new
+        ArrayList<RequestResourceFilter>() {{
+          add(resourceFilter);
+        }};
+    ExecuteActionRequest executeActionRequest = new ExecuteActionRequest
+        ("cluster1", null, serverActionName, resourceFilters, null, null, false);
     Request request = new Request(stages, clusters);
     db.persistActions(request);
   }

+ 3 - 3
ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java

@@ -84,7 +84,7 @@ public class TestActionManager {
   public void testActionResponse() throws AmbariException {
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
     ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(),
-        clusters, db, new HostsMap((String) null), null, unitOfWork,
+        clusters, db, new HostsMap((String) null), unitOfWork,
         injector.getInstance(RequestFactory.class), null);
     populateActionDB(db, hostname);
     Stage stage = db.getAllStages(requestId).get(0);
@@ -127,7 +127,7 @@ public class TestActionManager {
   public void testLargeLogs() throws AmbariException {
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
     ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(),
-        clusters, db, new HostsMap((String) null), null, unitOfWork,
+        clusters, db, new HostsMap((String) null), unitOfWork,
         injector.getInstance(RequestFactory.class), null);
     populateActionDB(db, hostname);
     Stage stage = db.getAllStages(requestId).get(0);
@@ -217,7 +217,7 @@ public class TestActionManager {
 
     replay(queue, db, clusters);
 
-    ActionManager manager = new ActionManager(0, 0, queue, clusters, db, null, null, unitOfWork,
+    ActionManager manager = new ActionManager(0, 0, queue, clusters, db, null, unitOfWork,
         injector.getInstance(RequestFactory.class), null);
     assertSame(listStages, manager.getActions(requestId));
 

+ 271 - 124
ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java

@@ -25,14 +25,7 @@ import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.*;
 import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -52,9 +45,7 @@ import org.apache.ambari.server.agent.CommandReport;
 import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.HostsMap;
-import org.apache.ambari.server.serveraction.ServerAction;
-import org.apache.ambari.server.serveraction.ServerActionManager;
-import org.apache.ambari.server.serveraction.ServerActionManagerImpl;
+import org.apache.ambari.server.serveraction.MockServerAction;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Host;
@@ -65,6 +56,7 @@ import org.apache.ambari.server.state.ServiceComponentHost;
 import org.apache.ambari.server.state.ServiceComponentHostEvent;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostServerActionEvent;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
 import org.apache.ambari.server.utils.StageUtils;
 import org.easymock.Capture;
@@ -86,6 +78,8 @@ public class TestActionScheduler {
   private static final String CLUSTER_HOST_INFO_UPDATED = "{all_hosts=[c6401.ambari.apache.org,"
       + " c6402.ambari.apache.org], slave_hosts=[c6401.ambari.apache.org,"
       + " c6402.ambari.apache.org]}";
+
+  private final String serverHostname = StageUtils.getHostName();
   private final String hostname = "ahost.ambari.apache.org";
   private final int MAX_CYCLE_ITERATIONS = 100;
 
@@ -96,7 +90,7 @@ public class TestActionScheduler {
    */
   @Test
   public void testActionSchedule() throws Exception {
-    
+
     Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
     Map<String, List<String>> clusterHostInfo = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
 
@@ -116,7 +110,7 @@ public class TestActionScheduler {
     when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
-    
+
     Host host = mock(Host.class);
     HashMap<String, ServiceComponentHost> hosts =
             new HashMap<String, ServiceComponentHost>();
@@ -141,7 +135,7 @@ public class TestActionScheduler {
     //Keep large number of attempts so that the task is not expired finally
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 5, db, aq, fsm,
-        10000, new HostsMap((String) null), null, unitOfWork, conf);
+        10000, new HostsMap((String) null), unitOfWork, conf);
     scheduler.setTaskTimeoutAdjustment(false);
 
     List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
@@ -167,7 +161,7 @@ public class TestActionScheduler {
       int expectedQueueSize, ActionScheduler scheduler) {
     int cycleCount = 0;
     while (cycleCount++ <= MAX_CYCLE_ITERATIONS) {
-      List<AgentCommand> ac = aq.dequeueAll(hostname);      
+      List<AgentCommand> ac = aq.dequeueAll(hostname);
       if (ac != null) {
         if (ac.size() == expectedQueueSize) {
           return ac;
@@ -239,7 +233,7 @@ public class TestActionScheduler {
 
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 0, db, aq, fsm, 3,
-        new HostsMap((String) null), null, unitOfWork, conf);
+        new HostsMap((String) null), unitOfWork, conf);
     scheduler.setTaskTimeoutAdjustment(false);
     // Start the thread
 
@@ -305,12 +299,10 @@ public class TestActionScheduler {
       }
     }).when(db).timeoutHostRole(anyString(), anyLong(), anyLong(), anyString());
 
-    ServerActionManager sam = EasyMock.createNiceMock(ServerActionManager.class);
-
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class).
         withConstructor((long) 100, (long) 50, db, aq, fsm, 3,
-                new HostsMap((String) null), sam, unitOfWork, conf).
+            new HostsMap((String) null), unitOfWork, conf).
         addMockedMethod("cancelHostRoleCommands").
         createMock();
     scheduler.cancelHostRoleCommands((Collection<HostRoleCommand>)EasyMock.anyObject(),EasyMock.anyObject(String.class));
@@ -425,7 +417,7 @@ public class TestActionScheduler {
 
     // Make sure the NN install doesn't timeout
     ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
-      new HostsMap((String) null), null, unitOfWork, conf);
+        new HostsMap((String) null), unitOfWork, conf);
     scheduler.setTaskTimeoutAdjustment(false);
 
     int cycleCount=0;
@@ -485,7 +477,13 @@ public class TestActionScheduler {
     ServiceComponent scomp = mock(ServiceComponent.class);
     ServiceComponentHost sch = mock(ServiceComponentHost.class);
     UnitOfWork unitOfWork = mock(UnitOfWork.class);
+    Host host = mock(Host.class);
+
+    when(host.getHostName()).thenReturn(serverHostname);
+    when(host.getState()).thenReturn(HostState.HEALTHY);
+
     when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+    when(fsm.getHost(anyString())).thenReturn(host);
     when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
     when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
@@ -493,14 +491,12 @@ public class TestActionScheduler {
 
     HashMap<String, ServiceComponentHost> hosts =
             new HashMap<String, ServiceComponentHost>();
-    hosts.put(hostname, sch);
+    hosts.put(serverHostname, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
     List<Stage> stages = new ArrayList<Stage>();
     Map<String, String> payload = new HashMap<String, String>();
-    payload.put(ServerAction.PayloadName.CLUSTER_NAME, "cluster1");
-    payload.put(ServerAction.PayloadName.CURRENT_STACK_VERSION, "HDP-0.2");
-    final Stage s = getStageWithServerAction(1, 977, hostname, payload, "test");
+    final Stage s = getStageWithServerAction(1, 977, payload, "test", 300);
     stages.add(s);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
@@ -522,21 +518,134 @@ public class TestActionScheduler {
       }
     }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class));
 
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        return s.getHostRoleCommand(serverHostname, "AMBARI_SERVER_ACTION");
+      }
+    }).when(db).getTask(anyLong());
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        String host = (String) invocation.getArguments()[0];
+        String role = (String) invocation.getArguments()[1];
+        HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2];
+
+        HostRoleCommand task = s.getHostRoleCommand(host, role);
+
+        if (task.getStatus() == status) {
+          return Arrays.asList(task);
+        } else {
+          return null;
+        }
+      }
+    }).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), any(HostRoleStatus.class));
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), new ServerActionManagerImpl(fsm),
-        unitOfWork, conf);
+        new HostsMap((String) null), unitOfWork, conf);
 
-    int cycleCount=0;
-    while (!stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION")
+    int cycleCount = 0;
+    while (!stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION")
         .equals(HostRoleStatus.COMPLETED) && cycleCount++ <= MAX_CYCLE_ITERATIONS) {
       scheduler.doWork();
+      scheduler.getServerActionExecutor().doWork();
     }
 
-    assertEquals(stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION"),
+    assertEquals(stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION"),
         HostRoleStatus.COMPLETED);
+  }
+
+  /**
+   * Test server action
+   */
+  @Test
+  public void testServerActionTimeOut() throws Exception {
+    ActionQueue aq = new ActionQueue();
+    Properties properties = new Properties();
+    Configuration conf = new Configuration(properties);
+    Clusters fsm = mock(Clusters.class);
+    Cluster oneClusterMock = mock(Cluster.class);
+    Service serviceObj = mock(Service.class);
+    ServiceComponent scomp = mock(ServiceComponent.class);
+    ServiceComponentHost sch = mock(ServiceComponentHost.class);
+    UnitOfWork unitOfWork = mock(UnitOfWork.class);
+    when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+    when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+    when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
+    when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
+    when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+    Host host = mock(Host.class);
+    HashMap<String, ServiceComponentHost> hosts =
+            new HashMap<String, ServiceComponentHost>();
+    hosts.put(serverHostname, sch);
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
+    when(fsm.getHost(anyString())).thenReturn(host);
+    when(host.getState()).thenReturn(HostState.HEALTHY);
+    when(host.getHostName()).thenReturn(serverHostname);
+
+    List<Stage> stages = new ArrayList<Stage>();
+    Map<String, String> payload = new HashMap<String, String>();
+    payload.put(MockServerAction.PAYLOAD_FORCE_FAIL, "timeout");
+    final Stage s = getStageWithServerAction(1, 977, payload, "test", 2);
+    stages.add(s);
+
+    ActionDBAccessor db = mock(ActionDBAccessor.class);
+
+    Request request = mock(Request.class);
+    when(request.isExclusive()).thenReturn(false);
+    when(db.getRequest(anyLong())).thenReturn(request);
+
+    when(db.getStagesInProgress()).thenReturn(stages);
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        String host = (String) invocation.getArguments()[0];
+        String role = (String) invocation.getArguments()[3];
+        CommandReport commandReport = (CommandReport) invocation.getArguments()[4];
+        HostRoleCommand command = s.getHostRoleCommand(host, role);
+        command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus()));
+        return null;
+      }
+    }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class));
+
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        return s.getHostRoleCommand(serverHostname, "AMBARI_SERVER_ACTION");
+      }
+    }).when(db).getTask(anyLong());
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        String host = (String) invocation.getArguments()[0];
+        String role = (String) invocation.getArguments()[1];
+        HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2];
+
+        HostRoleCommand task = s.getHostRoleCommand(host, role);
+
+        if (task.getStatus() == status) {
+          return Arrays.asList(task);
+        } else {
+          return null;
+        }
+      }
+    }).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), any(HostRoleStatus.class));
 
 
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+        new HostsMap((String) null), unitOfWork, conf);
+
+    int cycleCount = 0;
+    while (!stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION").isCompletedState()
+        && cycleCount++ <= MAX_CYCLE_ITERATIONS) {
+      scheduler.doWork();
+      scheduler.getServerActionExecutor().doWork();
+    }
+
+    assertEquals(HostRoleStatus.TIMEDOUT,
+        stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION"));
   }
 
   @Test
@@ -558,13 +667,13 @@ public class TestActionScheduler {
 
     HashMap<String, ServiceComponentHost> hosts =
             new HashMap<String, ServiceComponentHost>();
-    hosts.put(hostname, sch);
+    hosts.put(serverHostname, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
     List<Stage> stages = new ArrayList<Stage>();
     Map<String, String> payload = new HashMap<String, String>();
-    payload.put(ServerAction.PayloadName.CURRENT_STACK_VERSION, "HDP-0.2");
-    final Stage s = getStageWithServerAction(1, 977, hostname, payload, "test");
+    payload.put(MockServerAction.PAYLOAD_FORCE_FAIL, "exception");
+    final Stage s = getStageWithServerAction(1, 977, payload, "test", 300);
     stages.add(s);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
@@ -586,34 +695,57 @@ public class TestActionScheduler {
       }
     }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class));
 
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        return s.getHostRoleCommand(serverHostname, "AMBARI_SERVER_ACTION");
+      }
+    }).when(db).getTask(anyLong());
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        String host = (String) invocation.getArguments()[0];
+        String role = (String) invocation.getArguments()[1];
+        HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2];
+
+        HostRoleCommand task = s.getHostRoleCommand(host, role);
+
+        if (task.getStatus() == status) {
+          return Arrays.asList(task);
+        } else {
+          return null;
+        }
+      }
+    }).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), any(HostRoleStatus.class));
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, conf);
+        new HostsMap((String) null), unitOfWork, conf);
 
     int cycleCount = 0;
-    while (!stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION")
+    while (!stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION")
         .equals(HostRoleStatus.FAILED) && cycleCount++ <= MAX_CYCLE_ITERATIONS) {
       scheduler.doWork();
+      scheduler.getServerActionExecutor().doWork();
     }
-    assertEquals(stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION"),
+    assertEquals(stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION"),
         HostRoleStatus.FAILED);
     assertEquals("test", stages.get(0).getRequestContext());
   }
 
-  private static Stage getStageWithServerAction(long requestId, long stageId, String hostName,
-                                                Map<String, String> payload, String requestContext) {
+  private static Stage getStageWithServerAction(long requestId, long stageId,
+                                                Map<String, String> payload, String requestContext,
+                                                int timeout) {
+    String serverHostname = StageUtils.getHostName();
     Stage stage = new Stage(requestId, "/tmp", "cluster1", 1L, requestContext, CLUSTER_HOST_INFO,
-      "", "");
+      "{}", "{}");
     stage.setStageId(stageId);
-    long now = System.currentTimeMillis();
-    stage.addServerActionCommand(ServerAction.Command.FINALIZE_UPGRADE, Role.AMBARI_SERVER_ACTION,
+
+    stage.addServerActionCommand(MockServerAction.class.getName(), Role.AMBARI_SERVER_ACTION,
         RoleCommand.EXECUTE, "cluster1",
-        new ServiceComponentHostUpgradeEvent("AMBARI_SERVER_ACTION", hostName, now, "HDP-0.2"),
-        hostName);
-    ExecutionCommand execCmd = stage.getExecutionCommandWrapper(hostName,
-        Role.AMBARI_SERVER_ACTION.toString()).getExecutionCommand();
+        new ServiceComponentHostServerActionEvent(serverHostname, System.currentTimeMillis()),
+        payload,
+        timeout);
 
-    execCmd.setCommandParams(payload);
     return stage;
   }
 
@@ -666,14 +798,14 @@ public class TestActionScheduler {
                     RoleCommand.START, Service.Type.HDFS, 3, 3, 3));
 
     stages.add(
-            getStageWithSingleTask(
-                    hostname3, "cluster1", Role.DATANODE,
-                    RoleCommand.START, Service.Type.HDFS, 4, 4, 4));
+        getStageWithSingleTask(
+            hostname3, "cluster1", Role.DATANODE,
+            RoleCommand.START, Service.Type.HDFS, 4, 4, 4));
 
     stages.add( // Stage with the same request id, should not be scheduled
-            getStageWithSingleTask(
-                    hostname4, "cluster1", Role.GANGLIA_MONITOR,
-                    RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
+        getStageWithSingleTask(
+            hostname4, "cluster1", Role.GANGLIA_MONITOR,
+            RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
 
@@ -686,12 +818,11 @@ public class TestActionScheduler {
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-            new HostsMap((String) null), new ServerActionManagerImpl(fsm),
-            unitOfWork, conf);
+            new HostsMap((String) null), unitOfWork, conf);
 
     ActionManager am = new ActionManager(
-            2, 2, aq, fsm, db, new HostsMap((String) null),
-            new ServerActionManagerImpl(fsm), unitOfWork, requestFactory, conf);
+        2, 2, aq, fsm, db, new HostsMap((String) null),
+        unitOfWork, requestFactory, conf);
 
     scheduler.doWork();
 
@@ -740,24 +871,24 @@ public class TestActionScheduler {
                     hostname1, "cluster1", Role.DATANODE,
                     RoleCommand.START, Service.Type.HDFS, 1, 1, 1));
     stages.add( // Stage with the same hostname, should not be scheduled
-            getStageWithSingleTask(
-                    hostname1, "cluster1", Role.GANGLIA_MONITOR,
-                    RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2));
+        getStageWithSingleTask(
+            hostname1, "cluster1", Role.GANGLIA_MONITOR,
+            RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2));
 
     stages.add(
-            getStageWithSingleTask(
-                    hostname2, "cluster1", Role.DATANODE,
-                    RoleCommand.START, Service.Type.HDFS, 3, 3, 3));
+        getStageWithSingleTask(
+            hostname2, "cluster1", Role.DATANODE,
+            RoleCommand.START, Service.Type.HDFS, 3, 3, 3));
 
     stages.add(
-            getStageWithSingleTask(
-                    hostname3, "cluster1", Role.DATANODE,
-                    RoleCommand.START, Service.Type.HDFS, 4, 4, 4));
+        getStageWithSingleTask(
+            hostname3, "cluster1", Role.DATANODE,
+            RoleCommand.START, Service.Type.HDFS, 4, 4, 4));
 
     stages.add( // Stage with the same request id, should not be scheduled
-            getStageWithSingleTask(
-                    hostname4, "cluster1", Role.GANGLIA_MONITOR,
-                    RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
+        getStageWithSingleTask(
+            hostname4, "cluster1", Role.GANGLIA_MONITOR,
+            RoleCommand.START, Service.Type.GANGLIA, 5, 5, 4));
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
 
@@ -771,13 +902,13 @@ public class TestActionScheduler {
     properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "false");
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-            new HostsMap((String) null), new ServerActionManagerImpl(fsm),
+            new HostsMap((String) null),
             unitOfWork, conf);
 
     ActionManager am = new ActionManager(
-            2, 2, aq, fsm, db, new HostsMap((String) null),
-            new ServerActionManagerImpl(fsm), unitOfWork,
-            requestFactory, conf);
+        2, 2, aq, fsm, db, new HostsMap((String) null),
+        unitOfWork,
+        requestFactory, conf);
 
     scheduler.doWork();
 
@@ -805,7 +936,7 @@ public class TestActionScheduler {
     when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
-    
+
     String hostname1 = "ahost.ambari.apache.org";
     String hostname2 = "bhost.ambari.apache.org";
     HashMap<String, ServiceComponentHost> hosts =
@@ -813,26 +944,26 @@ public class TestActionScheduler {
     hosts.put(hostname1, sch);
     hosts.put(hostname2, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
-    
+
     List<Stage> stages = new ArrayList<Stage>();
     Stage backgroundStage = null;
     stages.add(//stage with background command
         backgroundStage = getStageWithSingleTask(
             hostname1, "cluster1", Role.NAMENODE, RoleCommand.CUSTOM_COMMAND, "REBALANCEHDFS", Service.Type.HDFS, 1, 1, 1));
-    
+
     Assert.assertEquals(AgentCommandType.BACKGROUND_EXECUTION_COMMAND ,backgroundStage.getExecutionCommands(hostname1).get(0).getExecutionCommand().getCommandType());
-    
+
     stages.add( // Stage with the same hostname, should be scheduled
         getStageWithSingleTask(
             hostname1, "cluster1", Role.GANGLIA_MONITOR,
             RoleCommand.START, Service.Type.GANGLIA, 2, 2, 2));
-    
+
     stages.add(
         getStageWithSingleTask(
             hostname2, "cluster1", Role.DATANODE,
             RoleCommand.START, Service.Type.HDFS, 3, 3, 3));
-    
-    
+
+
     ActionDBAccessor db = mock(ActionDBAccessor.class);
 
     Request request = mock(Request.class);
@@ -840,21 +971,21 @@ public class TestActionScheduler {
     when(db.getRequest(anyLong())).thenReturn(request);
 
     when(db.getStagesInProgress()).thenReturn(stages);
-    
+
     Properties properties = new Properties();
     properties.put(Configuration.PARALLEL_STAGE_EXECUTION_KEY, "true");
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-        new HostsMap((String) null), new ServerActionManagerImpl(fsm),
+        new HostsMap((String) null),
         unitOfWork, conf);
-    
+
     ActionManager am = new ActionManager(
         2, 2, aq, fsm, db, new HostsMap((String) null),
-        new ServerActionManagerImpl(fsm), unitOfWork,
+        unitOfWork,
         requestFactory, conf);
-    
+
     scheduler.doWork();
-    
+
     Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname1, "NAMENODE"));
     Assert.assertEquals(HostRoleStatus.QUEUED, stages.get(2).getHostRoleStatus(hostname2, "DATANODE"));
 
@@ -966,12 +1097,11 @@ public class TestActionScheduler {
 
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
-    ServerActionManagerImpl serverActionManager = new ServerActionManagerImpl(fsm);
 
     Capture<Collection<HostRoleCommand>> cancelCommandList = new Capture<Collection<HostRoleCommand>>();
     ActionScheduler scheduler = EasyMock.createMockBuilder(ActionScheduler.class).
         withConstructor((long)100, (long)50, db, aq, fsm, 3,
-          new HostsMap((String) null), serverActionManager,
+          new HostsMap((String) null),
           unitOfWork, conf).
           addMockedMethod("cancelHostRoleCommands").
           createMock();
@@ -981,7 +1111,7 @@ public class TestActionScheduler {
     EasyMock.replay(scheduler);
 
     ActionManager am = new ActionManager(
-        2, 2, aq, fsm, db, new HostsMap((String) null), serverActionManager, unitOfWork, requestFactory, conf);
+        2, 2, aq, fsm, db, new HostsMap((String) null), unitOfWork, requestFactory, conf);
 
     scheduler.doWork();
 
@@ -1140,10 +1270,10 @@ public class TestActionScheduler {
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 10000, db, aq, fsm, 3,
-        new HostsMap((String) null), new ServerActionManagerImpl(fsm),
+        new HostsMap((String) null),
         unitOfWork, conf);
     ActionManager am = new ActionManager(
-        2, 10000, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, requestFactory, conf);
+        2, 10000, aq, fsm, db, new HostsMap((String) null), unitOfWork, requestFactory, conf);
 
     scheduler.doWork();
 
@@ -1326,9 +1456,9 @@ public class TestActionScheduler {
     Configuration conf = new Configuration(properties);
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
         new HostsMap((String) null),
-        new ServerActionManagerImpl(fsm), unitOfWork, conf);
+        unitOfWork, conf);
     ActionManager am = new ActionManager(
-        2, 2, aq, fsm, db, new HostsMap((String) null), new ServerActionManagerImpl(fsm), unitOfWork, requestFactory, conf);
+        2, 2, aq, fsm, db, new HostsMap((String) null), unitOfWork, requestFactory, conf);
 
     scheduler.doWork();
 
@@ -1419,7 +1549,7 @@ public class TestActionScheduler {
     assertEquals(new Float(1.0), new Float(s.getSuccessFactor(Role.NAMENODE)));
     assertEquals(new Float(1.0), new Float(s.getSuccessFactor(Role.GANGLIA_SERVER)));
   }
-  
+
   @Test
   public void testSuccessCriteria() {
     RoleStats rs1 = new RoleStats(1, (float)0.5);
@@ -1427,37 +1557,37 @@ public class TestActionScheduler {
     assertTrue(rs1.isSuccessFactorMet());
     rs1.numSucceeded = 0;
     assertFalse(rs1.isSuccessFactorMet());
-    
+
     RoleStats rs2 = new RoleStats(2, (float)0.5);
     rs2.numSucceeded = 1;
     assertTrue(rs2.isSuccessFactorMet());
-    
+
     RoleStats rs3 = new RoleStats(3, (float)0.5);
     rs3.numSucceeded = 2;
     assertTrue(rs2.isSuccessFactorMet());
     rs3.numSucceeded = 1;
     assertFalse(rs3.isSuccessFactorMet());
-    
+
     RoleStats rs4 = new RoleStats(3, (float)1.0);
     rs4.numSucceeded = 2;
     assertFalse(rs3.isSuccessFactorMet());
   }
-  
+
   /**
    * This test sends verifies that ActionScheduler returns up-to-date cluster host info and caching works correctly.
    */
   @Test
   public void testClusterHostInfoCache() throws Exception {
-    
+
     Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
-    
+
     //Data for stages
     Map<String, Set<String>> clusterHostInfo1 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
     Map<String, Set<String>> clusterHostInfo2 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO_UPDATED, type);
     int stageId = 1;
     int requestId1 = 1;
     int requestId2 = 2;
-    
+
     ActionQueue aq = new ActionQueue();
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
@@ -1497,19 +1627,19 @@ public class TestActionScheduler {
     //Keep large number of attempts so that the task is not expired finally
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm,
-        10000, new HostsMap((String) null), null, unitOfWork, conf);
+        10000, new HostsMap((String) null), unitOfWork, conf);
     scheduler.setTaskTimeoutAdjustment(false);
 
     List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
 
     assertTrue(ac.get(0) instanceof ExecutionCommand);
     assertEquals(String.valueOf(requestId1) + "-" + stageId, ((ExecutionCommand) (ac.get(0))).getCommandId());
-    
+
     assertEquals(clusterHostInfo1, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo());
-    
+
 
     when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s2));
-    
+
     //Verify that ActionSheduler does not return cached value of cluster host info for new requestId
     ac = waitForQueueSize(hostname, aq, 1, scheduler);
     assertTrue(ac.get(0) instanceof ExecutionCommand);
@@ -1576,7 +1706,7 @@ public class TestActionScheduler {
     when(db.getStagesInProgress()).thenReturn(stages);
 
     ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
-            new HostsMap((String) null), null, unitOfWork, conf);
+            new HostsMap((String) null), unitOfWork, conf);
 
     final CountDownLatch abortCalls = new CountDownLatch(2);
 
@@ -1639,15 +1769,13 @@ public class TestActionScheduler {
 
     HashMap<String, ServiceComponentHost> hosts =
             new HashMap<String, ServiceComponentHost>();
-    hosts.put(hostname, sch);
+    hosts.put(serverHostname, sch);
     when(scomp.getServiceComponentHosts()).thenReturn(hosts);
 
     List<Stage> stages = new ArrayList<Stage>();
     Map<String, String> payload = new HashMap<String, String>();
-    payload.put(ServerAction.PayloadName.CLUSTER_NAME, "cluster1");
-    payload.put(ServerAction.PayloadName.CURRENT_STACK_VERSION, "HDP-0.2");
-    final Stage s = getStageWithServerAction(1, 977, hostname, payload, "test");
-    s.getExecutionCommands().get("ahost.ambari.apache.org").get(0).getExecutionCommand().setServiceName(null);
+    final Stage s = getStageWithServerAction(1, 977, payload, "test", 300);
+    s.getExecutionCommands().get(serverHostname).get(0).getExecutionCommand().setServiceName(null);
     stages.add(s);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
@@ -1668,19 +1796,40 @@ public class TestActionScheduler {
         return null;
       }
     }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class));
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        String host = (String) invocation.getArguments()[0];
+        String role = (String) invocation.getArguments()[1];
+        HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2];
 
+        HostRoleCommand task = s.getHostRoleCommand(host, role);
+
+        if (task.getStatus() == status) {
+          return Arrays.asList(task);
+        } else {
+          return null;
+        }
+      }
+    }).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), any(HostRoleStatus.class));
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        return s.getHostRoleCommand(serverHostname, "AMBARI_SERVER_ACTION");
+      }
+    }).when(db).getTask(anyLong());
 
     ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
-            new HostsMap((String) null), new ServerActionManagerImpl(fsm),
-            unitOfWork, conf);
+        new HostsMap((String) null), unitOfWork, conf);
 
     int cycleCount = 0;
-    while (!stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION")
-            .equals(HostRoleStatus.COMPLETED) && cycleCount <= MAX_CYCLE_ITERATIONS) {
+    while (!stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION")
+        .equals(HostRoleStatus.COMPLETED) && cycleCount++ <= MAX_CYCLE_ITERATIONS) {
       scheduler.doWork();
+      scheduler.getServerActionExecutor().doWork();
     }
 
-    assertEquals(stages.get(0).getHostRoleStatus(hostname, "AMBARI_SERVER_ACTION"),
+    assertEquals(stages.get(0).getHostRoleStatus(serverHostname, "AMBARI_SERVER_ACTION"),
             HostRoleStatus.COMPLETED);
   }
 
@@ -1800,14 +1949,13 @@ public class TestActionScheduler {
 
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
-    ServerActionManagerImpl serverActionManager = new ServerActionManagerImpl(fsm);
 
-    ActionScheduler scheduler =new ActionScheduler(100, 50, db, aq, fsm, 3,
-                    new HostsMap((String) null), serverActionManager, unitOfWork, conf);
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+        new HostsMap((String) null), unitOfWork, conf);
 
     ActionManager am = new ActionManager(
-            2, 2, aq, fsm, db, new HostsMap((String) null),
-            serverActionManager, unitOfWork, requestFactory, conf);
+        2, 2, aq, fsm, db, new HostsMap((String) null),
+        unitOfWork, requestFactory, conf);
 
     scheduler.doWork();
 
@@ -1967,14 +2115,13 @@ public class TestActionScheduler {
 
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
-    ServerActionManagerImpl serverActionManager = new ServerActionManagerImpl(fsm);
 
-    ActionScheduler scheduler =new ActionScheduler(100, 50, db, aq, fsm, 3,
-            new HostsMap((String) null), serverActionManager, unitOfWork, conf);
+    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
+        new HostsMap((String) null), unitOfWork, conf);
 
     ActionManager am = new ActionManager(
-            2, 2, aq, fsm, db, new HostsMap((String) null),
-            serverActionManager, unitOfWork, requestFactory, conf);
+        2, 2, aq, fsm, db, new HostsMap((String) null),
+        unitOfWork, requestFactory, conf);
 
     // Execution of request 1
 

+ 2 - 4
ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java

@@ -77,7 +77,6 @@ import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
-import org.apache.ambari.server.serveraction.ServerActionManager;
 import org.apache.ambari.server.state.Alert;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
@@ -723,7 +722,7 @@ public class TestHeartbeatHandler {
     clusters.addCluster(DummyCluster);
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
     ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(), clusters, db,
-        new HostsMap((String) null), null, unitOfWork, injector.getInstance(RequestFactory.class), null);
+        new HostsMap((String) null), unitOfWork, injector.getInstance(RequestFactory.class), null);
     populateActionDB(db, DummyHostname1);
     Stage stage = db.getAllStages(requestId).get(0);
     Assert.assertEquals(stageId, stage.getStageId());
@@ -2109,13 +2108,12 @@ public class TestHeartbeatHandler {
   private ActionManager getMockActionManager() {
     ActionQueue actionQueueMock = createNiceMock(ActionQueue.class);
     Clusters clustersMock = createNiceMock(Clusters.class);
-    ServerActionManager serverActionManagerMock = createNiceMock(ServerActionManager.class);
     Configuration configurationMock = createNiceMock(Configuration.class);
 
     ActionManager actionManager = createMockBuilder(ActionManager.class).
             addMockedMethod("getTasks").
             withConstructor((long)0, (long)0, actionQueueMock, clustersMock,
-                    actionDBAccessor, new HostsMap((String) null), serverActionManagerMock, unitOfWork,
+                    actionDBAccessor, new HostsMap((String) null), unitOfWork,
                     injector.getInstance(RequestFactory.class), configurationMock).
             createMock();
     return actionManager;

+ 1 - 24
ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java

@@ -91,8 +91,6 @@ import org.apache.ambari.server.orm.dao.HostDAO;
 import org.apache.ambari.server.orm.entities.ExecutionCommandEntity;
 import org.apache.ambari.server.security.authorization.Users;
 import org.apache.ambari.server.serveraction.ServerAction;
-import org.apache.ambari.server.serveraction.ServerActionManager;
-import org.apache.ambari.server.serveraction.ServerActionManagerImpl;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.Config;
@@ -7138,25 +7136,6 @@ public class AmbariManagementControllerTest {
     assertEquals(0, response.getCustomCommands().size());
   }
 
-  @Test
-  public void testServerActionForUpgradeFinalization() throws AmbariException {
-    String clusterName = "foo1";
-    StackId currentStackId = new StackId("HDP-0.1");
-    StackId newStackId = new StackId("HDP-0.2");
-
-    createCluster(clusterName);
-    Cluster c = clusters.getCluster(clusterName);
-    c.setDesiredStackVersion(currentStackId);
-    Assert.assertTrue(c.getCurrentStackVersion().equals(currentStackId));
-
-    ServerActionManager serverActionManager = new ServerActionManagerImpl(clusters);
-    Map<String, String> payload = new HashMap<String, String>();
-    payload.put(ServerAction.PayloadName.CLUSTER_NAME, clusterName);
-    payload.put(ServerAction.PayloadName.CURRENT_STACK_VERSION, newStackId.getStackId());
-    serverActionManager.executeAction(ServerAction.Command.FINALIZE_UPGRADE, payload);
-    Assert.assertTrue(c.getCurrentStackVersion().equals(newStackId));
-  }
-
   // disabled as upgrade feature is disabled
   @Ignore
   @Test
@@ -7485,9 +7464,7 @@ public class AmbariManagementControllerTest {
           currRoleOrder = expectedTasks.getRoleOrder(command.getRole());
           ExecutionCommand execCommand = command.getExecutionCommandWrapper().getExecutionCommand();
           Assert.assertTrue(
-              execCommand.getCommandParams().containsKey(ServerAction.PayloadName.CURRENT_STACK_VERSION));
-          Assert.assertTrue(
-              execCommand.getCommandParams().containsKey(ServerAction.PayloadName.CLUSTER_NAME));
+              execCommand.getRoleParams().containsKey(ServerAction.ACTION_NAME));
           Assert.assertEquals(RoleCommand.EXECUTE, execCommand.getRoleCommand());
         } else {
           Assert.assertTrue(command.toString(), expectedTasks.isTaskExpected(command.getRole(), command.getHostName()));

+ 92 - 0
ambari-server/src/test/java/org/apache/ambari/server/serveraction/MockServerAction.java

@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.serveraction;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.agent.ExecutionCommand;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * The MockServerAction is an implementation of a ServerAction strictly used to testing purposes.
+ * <p/>
+ * This class helps to generate several scenarios from success cases to failure cases.  The
+ * force_fail command parameter can be used to generate different failure cases:
+ * <ul>
+ * <li>exception
+ * - Causes the action to fail by throwing an AmbariException</li>
+ * <li>timeout
+ * - Causes the action to fail by timing out (the COMMAND_TIMEOUT value must be set to a reasonable
+ * value)</li>
+ * </dl>
+ *
+ * If not instructed to fail, this implementation will attempt to increment a "data" counter in a
+ * shared data context - if available.
+ */
+public class MockServerAction extends AbstractServerAction {
+
+  public static final String PAYLOAD_FORCE_FAIL = "force_fail";
+
+  @Override
+  public CommandReport execute(ConcurrentMap<String, Object> requestSharedDataContext)
+      throws AmbariException, InterruptedException {
+
+    Map<String, String> commandParameters = getCommandParameters();
+
+    if (commandParameters == null) {
+      throw new AmbariException("Missing payload");
+    } else if ("exception".equalsIgnoreCase(commandParameters.get(PAYLOAD_FORCE_FAIL))) {
+      throw new AmbariException("Failing execution by request");
+    } else if ("report".equalsIgnoreCase(commandParameters.get(PAYLOAD_FORCE_FAIL))) {
+      return createCommandReport(1, HostRoleStatus.FAILED, null, "Forced fail via command", "Failing execution by request");
+    } else {
+      if ("timeout".equalsIgnoreCase(commandParameters.get(PAYLOAD_FORCE_FAIL))) {
+        Long timeout;
+
+        try {
+          timeout = (commandParameters.containsKey(ExecutionCommand.KeyNames.COMMAND_TIMEOUT))
+              ? Long.parseLong(commandParameters.get(ExecutionCommand.KeyNames.COMMAND_TIMEOUT)) * 1000 // Convert seconds to milliseconds
+              : null;
+        } catch (NumberFormatException e) {
+          timeout = null;
+        }
+
+        if (timeout != null) {
+          Thread.sleep(timeout * 10);
+        }
+      }
+
+      // Test updating the shared data context...
+      if (requestSharedDataContext != null) {
+        Integer data = (Integer) requestSharedDataContext.get("Data");
+
+        if (data == null) {
+          data = 0;
+        }
+
+        requestSharedDataContext.put("Data", ++data);
+      }
+
+      return createCommandReport(0, HostRoleStatus.COMPLETED, null, "Success!", null);
+    }
+  }
+}

+ 248 - 0
ambari-server/src/test/java/org/apache/ambari/server/serveraction/ServerActionExecutorTest.java

@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.serveraction;
+
+import org.apache.ambari.server.Role;
+import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.actionmanager.*;
+import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.state.svccomphost.ServiceComponentHostServerActionEvent;
+import org.apache.ambari.server.utils.StageUtils;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ServerActionExecutorTest {
+  private static final int MAX_CYCLE_ITERATIONS = 1000;
+  private static final String SERVER_HOST_NAME = StageUtils.getHostName();
+  private static final String CLUSTER_HOST_INFO = "{all_hosts=["
+      + SERVER_HOST_NAME + "], slave_hosts=["
+      + SERVER_HOST_NAME + "]}";
+
+  /**
+   * Test a normal server action
+   */
+  @Test
+  public void testServerAction() throws Exception {
+    final Request request = createMockRequest();
+    final Stage s = getStageWithServerAction(1, 977, null, "test", 300);
+    final List<Stage> stages = new ArrayList<Stage>() {
+      {
+        add(s);
+      }
+    };
+    ActionDBAccessor db = createMockActionDBAccessor(request, stages);
+    ServerActionExecutor executor = new ServerActionExecutor(db, 10000);
+
+    // Force the task to be QUEUED
+    s.getHostRoleCommand(SERVER_HOST_NAME, Role.AMBARI_SERVER_ACTION.toString()).setStatus(HostRoleStatus.QUEUED);
+
+    int cycleCount = 0;
+    while (!getTaskStatus(s).isCompletedState() && (cycleCount++ <= MAX_CYCLE_ITERATIONS)) {
+      executor.doWork();
+    }
+
+    assertEquals(HostRoleStatus.COMPLETED, getTaskStatus(s));
+  }
+
+
+  /**
+   * Test a timeout server action
+   */
+  @Test
+  public void testServerActionTimeout() throws Exception {
+    final Request request = createMockRequest();
+    final Stage s = getStageWithServerAction(1,
+        977,
+        new HashMap<String, String>() {{
+          put(MockServerAction.PAYLOAD_FORCE_FAIL, "timeout");
+        }},
+        "test",
+        1);
+    final List<Stage> stages = new ArrayList<Stage>() {
+      {
+        add(s);
+      }
+    };
+    ActionDBAccessor db = createMockActionDBAccessor(request, stages);
+    ServerActionExecutor executor = new ServerActionExecutor(db, 10000);
+
+    // Force the task to be QUEUED
+    s.getHostRoleCommand(SERVER_HOST_NAME, Role.AMBARI_SERVER_ACTION.toString()).setStatus(HostRoleStatus.QUEUED);
+
+    int cycleCount = 0;
+    while (!getTaskStatus(s).isCompletedState() && (cycleCount++ <= MAX_CYCLE_ITERATIONS)) {
+      executor.doWork();
+    }
+
+    assertEquals(HostRoleStatus.TIMEDOUT, getTaskStatus(s));
+  }
+
+
+  /**
+   * Test a timeout server action
+   */
+  @Test
+  public void testServerActionFailedException() throws Exception {
+    final Request request = createMockRequest();
+    final Stage s = getStageWithServerAction(1,
+        977,
+        new HashMap<String, String>() {{
+          put(MockServerAction.PAYLOAD_FORCE_FAIL, "exception");
+        }},
+        "test",
+        1);
+    final List<Stage> stages = new ArrayList<Stage>() {
+      {
+        add(s);
+      }
+    };
+    ActionDBAccessor db = createMockActionDBAccessor(request, stages);
+    ServerActionExecutor executor = new ServerActionExecutor(db, 10000);
+
+    // Force the task to be QUEUED
+    s.getHostRoleCommand(SERVER_HOST_NAME, Role.AMBARI_SERVER_ACTION.toString()).setStatus(HostRoleStatus.QUEUED);
+
+    int cycleCount = 0;
+    while (!getTaskStatus(s).isCompletedState() && (cycleCount++ <= MAX_CYCLE_ITERATIONS)) {
+      executor.doWork();
+    }
+
+    assertEquals(HostRoleStatus.FAILED, getTaskStatus(s));
+  }
+
+  /**
+   * Test a timeout server action
+   */
+  @Test
+  public void testServerActionFailedReport() throws Exception {
+    final Request request = createMockRequest();
+    final Stage s = getStageWithServerAction(1,
+        977,
+        new HashMap<String, String>() {{
+          put(MockServerAction.PAYLOAD_FORCE_FAIL, "report");
+        }},
+        "test",
+        1);
+    final List<Stage> stages = new ArrayList<Stage>() {
+      {
+        add(s);
+      }
+    };
+    ActionDBAccessor db = createMockActionDBAccessor(request, stages);
+    ServerActionExecutor executor = new ServerActionExecutor(db, 10000);
+
+    // Force the task to be QUEUED
+    s.getHostRoleCommand(SERVER_HOST_NAME, Role.AMBARI_SERVER_ACTION.toString()).setStatus(HostRoleStatus.QUEUED);
+
+    int cycleCount = 0;
+    while (!getTaskStatus(s).isCompletedState() && (cycleCount++ <= MAX_CYCLE_ITERATIONS)) {
+      executor.doWork();
+    }
+
+    assertEquals(HostRoleStatus.FAILED, getTaskStatus(s));
+  }
+
+  private HostRoleStatus getTaskStatus(List<Stage> stages, int i) {
+    return getTaskStatus(stages.get(i));
+  }
+
+  private HostRoleStatus getTaskStatus(Stage stage) {
+    return stage.getHostRoleStatus(SERVER_HOST_NAME, "AMBARI_SERVER_ACTION");
+  }
+
+  private Request createMockRequest() {
+    Request request = mock(Request.class);
+    when(request.isExclusive()).thenReturn(false);
+    when(request.getRequestId()).thenReturn(1L);
+    return request;
+  }
+
+  private ActionDBAccessor createMockActionDBAccessor(final Request request, final List<Stage> stages) {
+    ActionDBAccessor db = mock(ActionDBAccessor.class);
+
+    when(db.getStagesInProgress()).thenReturn(stages);
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        RequestStatus status = (RequestStatus) invocation.getArguments()[0];
+
+        if (status == RequestStatus.IN_PROGRESS) {
+          return Arrays.asList(request);
+        } else {
+          return Collections.emptyList();
+        }
+      }
+    }).when(db).getRequestsByStatus(any(RequestStatus.class), anyInt(), anyBoolean());
+
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        String host = (String) invocation.getArguments()[0];
+        String role = (String) invocation.getArguments()[3];
+        CommandReport commandReport = (CommandReport) invocation.getArguments()[4];
+        HostRoleCommand command = stages.get(0).getHostRoleCommand(host, role);
+        command.setStatus(HostRoleStatus.valueOf(commandReport.getStatus()));
+        return null;
+      }
+    }).when(db).updateHostRoleState(anyString(), anyLong(), anyLong(), anyString(), any(CommandReport.class));
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        String host = (String) invocation.getArguments()[0];
+        String role = (String) invocation.getArguments()[1];
+        HostRoleStatus status = (HostRoleStatus) invocation.getArguments()[2];
+
+        HostRoleCommand task = stages.get(0).getHostRoleCommand(host, role);
+
+        if (task.getStatus() == status) {
+          return Arrays.asList(task);
+        } else {
+          return null;
+        }
+      }
+    }).when(db).getTasksByHostRoleAndStatus(anyString(), anyString(), any(HostRoleStatus.class));
+
+    return db;
+  }
+
+  private static Stage getStageWithServerAction(long requestId, long stageId,
+                                                Map<String, String> payload, String requestContext,
+                                                int timeout) {
+    Stage stage = new Stage(requestId, "/tmp", "cluster1", 1L, requestContext, CLUSTER_HOST_INFO,
+        "{}", "{}");
+
+    stage.setStageId(stageId);
+    stage.addServerActionCommand(MockServerAction.class.getName(), Role.AMBARI_SERVER_ACTION,
+        RoleCommand.EXECUTE, "cluster1",
+        new ServiceComponentHostServerActionEvent(SERVER_HOST_NAME, System.currentTimeMillis()),
+        payload, timeout);
+
+    return stage;
+  }
+}