Procházet zdrojové kódy

AMBARI-732. Action scheduler tests.

git-svn-id: https://svn.apache.org/repos/asf/incubator/ambari/branches/AMBARI-666@1386969 13f79535-47bb-0310-9956-ffa450edef68
Jitendra Nath Pandey před 12 roky
rodič
revize
0e7d852128

+ 2 - 0
AMBARI-666-CHANGES.txt

@@ -18,6 +18,8 @@ AMBARI-666 branch (unreleased changes)
   AMBARI-746. Integrate configuration properties and custom configuration 
   file overrides in Customize Services page. (yusaku)
 
+  AMBARI-732. Action scheduler unit tests. (jitendra)
+
   AMBARI-739. Cluster fsm implementation. (hitesh)
 
   AMBARI-745. Add unit tests for Installer Step 1 (Welcome page). (yusaku)

+ 12 - 1
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java

@@ -38,7 +38,6 @@ public class ActionDBAccessor {
   
   /**
    * Returns all the actions that have been queued but not completed yet.
-   * This is used by scheduler to find all pending actions.
    */
   public List<Stage> getQueuedStages() {
     return null;
@@ -65,4 +64,16 @@ public class ActionDBAccessor {
   public void timeoutHostRole(long requestId, long stageId, Role role) {
     // TODO Auto-generated method stub
   }
+
+  /**
+   * Returns all the pending stages, including queued and not-queued.
+   */
+  public List<Stage> getPendingStages() {
+    return null;
+  }
+
+  public void persistActions(List<Stage> stages) {
+    // TODO Auto-generated method stub
+    
+  }
 }

+ 10 - 4
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java

@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.state.live.Clusters;
 
 
 /**
@@ -30,11 +31,15 @@ public class ActionManager {
   private final ActionScheduler scheduler;
   private final ActionDBAccessor db;
   private final ActionQueue actionQueue;
-  public ActionManager(long schedulerSleepTime, long actionTimeout, ActionQueue aq) {
+  private final Clusters fsm;
+
+  public ActionManager(long schedulerSleepTime, long actionTimeout,
+      ActionQueue aq, Clusters fsm) {
     this.actionQueue = aq;
     db = new ActionDBAccessor();
-    scheduler = new ActionScheduler(schedulerSleepTime,
-        actionTimeout, db, actionQueue);
+    scheduler = new ActionScheduler(schedulerSleepTime, actionTimeout, db,
+        actionQueue, fsm, 2);
+    this.fsm = fsm;
   }
   
   public void initialize() {
@@ -46,10 +51,11 @@ public class ActionManager {
   }
   
   public void sendActions(List<Stage> stages) {
-    //Store all these actions to the db
+    db.persistActions(stages);
   }
 
   public List<Stage> getRequestStatus(String requestId) {
+    //fetch status from db
     return null;
   }
 

+ 50 - 12
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java

@@ -23,28 +23,38 @@ import java.util.TreeMap;
 
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.agent.ActionQueue;
-import org.apache.ambari.server.agent.AgentCommand;
 import org.apache.ambari.server.agent.ExecutionCommand;
+import org.apache.ambari.server.state.fsm.InvalidStateTransitonException;
+import org.apache.ambari.server.state.live.Clusters;
+import org.apache.ambari.server.state.live.ServiceComponentHostEvent;
+import org.apache.ambari.server.state.live.ServiceComponentHostEventType;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 //This class encapsulates the action scheduler thread. 
 //Action schedule frequently looks at action database and determines if
 //there is an action that can be scheduled.
 class ActionScheduler implements Runnable {
   
+  private static Log LOG = LogFactory.getLog(ActionScheduler.class);
   private final long actionTimeout;
   private final long sleepTime;
   private volatile boolean shouldRun = true;
   private Thread schedulerThread = null;
   private final ActionDBAccessor db;
-  private final short maxAttempts = 2;
+  private final short maxAttempts;
   private final ActionQueue actionQueue;
+  private final Clusters fsmObject;
   
   public ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec,
-      ActionDBAccessor db, ActionQueue actionQueue) {
+      ActionDBAccessor db, ActionQueue actionQueue, Clusters fsmObject,
+      int maxAttempts) {
     this.sleepTime = sleepTimeMilliSec;
     this.actionTimeout = actionTimeoutMilliSec;
     this.db = db;
     this.actionQueue = actionQueue;
+    this.fsmObject = fsmObject;
+    this.maxAttempts = (short) maxAttempts;
   }
   
   public void start() {
@@ -73,7 +83,7 @@ class ActionScheduler implements Runnable {
   }
   
   private void doWork() {
-    List<Stage> stages = db.getQueuedStages();
+    List<Stage> stages = db.getPendingStages();
     if (stages == null || stages.isEmpty()) {
       //Nothing to do
       return;
@@ -112,23 +122,51 @@ class ActionScheduler implements Runnable {
       Map<String, HostRoleCommand> hrcMap) {
     for (String host : hrcMap.keySet()) {
       HostRoleCommand hrc = hrcMap.get(host);
+      if ( (hrc.getStatus() != HostRoleStatus.PENDING) &&
+           (hrc.getStatus() != HostRoleStatus.QUEUED) ) {
+        //This task has been executed
+        continue;
+      }
       long now = System.currentTimeMillis();
-      if (now > hrc.getExpiryTime()) {
-        // expired
-        if (now > hrc.getStartTime() + actionTimeout * maxAttempts) {
+      if (now > hrc.getLastAttemptTime()+actionTimeout) {
+        LOG.info("Host:"+host+", role:"+hrc.getRole()+", actionId:"+stage.getActionId()+" timed out");
+        if (hrc.getAttemptCount() >= maxAttempts) {
+          LOG.warn("Host:"+host+", role:"+hrc.getRole()+", actionId:"+stage.getActionId()+" expired");
           // final expired
-          db.timeoutHostRole(stage.getRequestId(), stage.getStageId(), hrc.getRole());
+          ServiceComponentHostEvent timeoutEvent = new ServiceComponentHostEvent(
+              ServiceComponentHostEventType.HOST_SVCCOMP_OP_FAILED, hrc
+                  .getRole().toString(), hrc.getHostName(), now);
+          try {
+            fsmObject.getCluster(stage.getClusterName())
+                .handleServiceComponentHostEvent("", hrc.getRole().toString(),
+                    hrc.getHostName(), timeoutEvent);
+          } catch (InvalidStateTransitonException e) {
+            // Propagate exception
+            e.printStackTrace();
+          }
+          db.timeoutHostRole(stage.getRequestId(), stage.getStageId(),
+              hrc.getRole());
         } else {
-          rescheduleHostRole(stage, hrc);
+          scheduleHostRole(stage, hrc);
         }
       }
     }
   }
 
-  private void rescheduleHostRole(Stage s,
-      HostRoleCommand hrc) {
+  private void scheduleHostRole(Stage s, HostRoleCommand hrc) {
+    LOG.info("Host:"+hrc.getHostName()+", role:"+hrc.getRole()+", actionId:"+s.getActionId()+" being scheduled");
     long now = System.currentTimeMillis();
-    hrc.setExpiryTime(now);
+    if (hrc.getStartTime() < 0) {
+      try {
+        fsmObject.getCluster(s.getClusterName())
+            .handleServiceComponentHostEvent("", "", hrc.getHostName(),
+                hrc.getEvent());
+      } catch (InvalidStateTransitonException e) {
+        e.printStackTrace();
+      }
+    }
+    hrc.setLastAttemptTime(now);
+    hrc.incrementAttemptCount();
     ExecutionCommand cmd = new ExecutionCommand();
     cmd.setCommandId(s.getActionId());
     cmd.setManifest(s.getManifest(hrc.getHostName()));

+ 14 - 1
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostAction.java

@@ -17,6 +17,7 @@
  */
 package org.apache.ambari.server.actionmanager;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -24,15 +25,27 @@ public class HostAction {
   private final String host;
   private Map<String, String> params = null;
   private String manifest = null;
-  private List<HostRoleCommand> roles;
+  private final List<HostRoleCommand> roles;
 
   public HostAction(String host) {
     this.host = host;
+    roles = new ArrayList<HostRoleCommand>();
+  }
+  
+  public void addHostRoleCommand(HostRoleCommand cmd) {
+    roles.add(cmd);
   }
   
   public List<HostRoleCommand> getRoleCommands() {
     return roles;
   }
+  
+  public void setManifest(String manifest) {
+    if (this.manifest != null) {
+      throw new RuntimeException("Not allowed to set manifest twice");
+    }
+    this.manifest = manifest;
+  }
 
   public String getManifest() {
     if (manifest == null) {

+ 27 - 7
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java

@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.state.live.ServiceComponentHostEvent;
 
 /**
  * This class encapsulates all the information for an action
@@ -32,14 +33,18 @@ public class HostRoleCommand {
   private Map<String, String> params = null;
   private HostRoleStatus status = HostRoleStatus.PENDING;
   private final RoleCommand cmd;
-  private long startTime;
-  private long expiryTime;
+  private long startTime = -1;
+  private long lastAttemptTime = -1;
+  private short attemptCount = 0;
   private final String host;
+  private final ServiceComponentHostEvent event;
 
-  public HostRoleCommand(String host, Role role, RoleCommand cmd) {
+  public HostRoleCommand(String host, Role role, RoleCommand cmd,
+      ServiceComponentHostEvent event) {
     this.host = host;
     this.role = role;
     this.cmd = cmd;
+    this.event = event;
   }
 
   public Role getRole() {
@@ -54,15 +59,30 @@ public class HostRoleCommand {
     return startTime;
   }
   
-  public long getExpiryTime() {
-    return expiryTime;
+  public long getLastAttemptTime() {
+    return this.lastAttemptTime;
   }
   
-  public void setExpiryTime(long t) {
-    expiryTime = t;
+  public void setLastAttemptTime(long t) {
+    this.lastAttemptTime = t;
   }
 
   public String getHostName() {
     return this.host;
   }
+  
+  public ServiceComponentHostEvent getEvent() {
+    return event;
+  }
+  public void incrementAttemptCount() {
+    this.attemptCount ++;
+  }
+  
+  public short getAttemptCount() {
+    return this.attemptCount;
+  }
+  
+  void setStatus(HostRoleStatus status) {
+    this.status = status;
+  }
 }

+ 7 - 2
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java

@@ -18,7 +18,6 @@
 package org.apache.ambari.server.actionmanager;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -28,6 +27,7 @@ import org.apache.ambari.server.Role;
 //required to persist an action.
 public class Stage {
   private final long requestId;
+  private final String clusterName;
   private long stageId = -1;
   
   //Map of roles to successFactors for this stage. Default is 1 i.e. 100%
@@ -37,9 +37,10 @@ public class Stage {
   private Map<String, HostAction> hostActions = new TreeMap<String, HostAction>();
   private final String logDir;
   
-  public Stage(long requestId, String logDir) {
+  public Stage(long requestId, String logDir, String clusterName) {
     this.requestId = requestId;
     this.logDir = logDir;
+    this.clusterName = clusterName;
   }
   
   public synchronized void setStageId(long stageId) {
@@ -90,4 +91,8 @@ public class Stage {
     // TODO Auto-generated method stub
     return getHostAction(hostName).getManifest();
   }
+  
+  public String getClusterName() {
+    return clusterName;
+  }
 }

+ 24 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java

@@ -18,14 +18,26 @@
 package org.apache.ambari.server.agent;
 
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Queue;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 public class ActionQueue {
+  
+  private static Log LOG = LogFactory.getLog(ActionQueue.class);
+  
   Map<String, Queue<AgentCommand>> hostQueues;
 
+  public ActionQueue() {
+    hostQueues = new TreeMap<String, Queue<AgentCommand>>();
+  }
+  
   private synchronized Queue<AgentCommand> getQueue(String hostname) {
     return hostQueues.get(hostname);
   }
@@ -35,7 +47,16 @@ public class ActionQueue {
   }
 
   public void enqueue(String hostname, AgentCommand cmd) {
+    
     Queue<AgentCommand> q = getQueue(hostname);
+    if (q == null) {
+      q = new LinkedList<AgentCommand>();
+      addQueue(hostname, q);
+    }
+    if (q.contains(cmd)) {
+      LOG.warn("cmd already exists in the queue, not adding again");
+      return;
+    }
     synchronized (q) {
       q.add(cmd);
     }
@@ -43,6 +64,9 @@ public class ActionQueue {
 
   public AgentCommand dequeue(String hostname) {
     Queue<AgentCommand> q = getQueue(hostname);
+    if (q == null) {
+      return null;
+    }
     synchronized (q) {
       return q.remove();
     }

+ 17 - 2
ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java

@@ -28,7 +28,7 @@ import javax.xml.bind.annotation.XmlType;
 @XmlType(name = "", propOrder = {})
 public class ExecutionCommand extends AgentCommand {
   @XmlElement
-  String manifest;
+  String manifest ="";
   
   public String getManifest() {
     return this.manifest;
@@ -37,4 +37,19 @@ public class ExecutionCommand extends AgentCommand {
   public void setManifest(String manifest) {
     this.manifest = manifest;
   }
-}
+  
+  @Override //Object
+  public boolean equals(Object other) {
+    if (!(other instanceof ExecutionCommand)) {
+      return false;
+    }
+    ExecutionCommand o = (ExecutionCommand)other;
+    return  this.manifest.equals(o.getManifest());
+  }
+  
+  @Override //Object
+  public int hashCode() {
+    //Assume two different actions will always have a different manifest
+    return manifest.hashCode();
+  }
+ }