Browse Source

AMBARI-754. Heartbeat handler: Registration response should query component status.

git-svn-id: https://svn.apache.org/repos/asf/incubator/ambari/branches/AMBARI-666@1392523 13f79535-47bb-0310-9956-ffa450edef68
Jitendra Nath Pandey 13 năm trước cách đây
mục cha
commit
a8461c0fc4
21 tập tin đã thay đổi với 227 bổ sung126 xóa
  1. 3 0
      AMBARI-666-CHANGES.txt
  2. 3 19
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java
  3. 2 33
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
  4. 42 39
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java
  5. 2 2
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java
  6. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java
  7. 7 6
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java
  8. 13 0
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
  9. 7 4
      ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
  10. 8 0
      ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java
  11. 28 9
      ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
  12. 2 1
      ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
  13. 20 0
      ambari-server/src/main/java/org/apache/ambari/server/agent/HostStatus.java
  14. 8 6
      ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java
  15. 8 0
      ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java
  16. 4 0
      ambari-server/src/main/java/org/apache/ambari/server/state/live/Cluster.java
  17. 6 0
      ambari-server/src/main/java/org/apache/ambari/server/state/live/ClusterImpl.java
  18. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/state/live/host/HostEvent.java
  19. 4 4
      ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java
  20. 0 1
      ambari-server/src/test/java/org/apache/ambari/server/agent/AgentResourceTest.java
  21. 58 0
      ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java

+ 3 - 0
AMBARI-666-CHANGES.txt

@@ -12,6 +12,9 @@ AMBARI-666 branch (unreleased changes)
 
   NEW FEATURES
 
+  AMBARI-754. Heartbeat handler: Registration response should query component 
+  status. (jitendra)
+
   AMBARI-779. Introduce ManagementController interface. (Tom Beerbower via hitesh)
 
   AMBARI-755. Heartbeat handler: Update state as reported in heartbeat.

+ 3 - 19
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java

@@ -23,26 +23,9 @@ import org.apache.ambari.server.Role;
 
 public interface ActionDBAccessor {
 
-  public void persistAction(HostAction ha);
-
   public Stage getAction(String actionId);
 
-  public List<Stage> getAllStages(String requestId);
-
-  /**
-   * Returns all the actions that have been queued but not completed yet.
-   */
-  public List<Stage> getQueuedStages();
-
-  /**
-   * Returns all the actions that have not been queued yet.
-   */
-  public List<Stage> getNotQueuedStages();
-
-  /**
-   * Returns next stage id in the sequence, must be persisted.
-   */
-  public long getNextStageId();
+  public List<Stage> getAllStages(long requestId);
 
   public void abortOperation(long requestId);
 
@@ -50,8 +33,9 @@ public interface ActionDBAccessor {
 
   /**
    * Returns all the pending stages, including queued and not-queued.
+   * A stage is considered in progress if it is in progress for any host.
    */
-  public List<Stage> getPendingStages();
+  public List<Stage> getStagesInProgress();
 
   public void persistActions(List<Stage> stages);
 

+ 2 - 33
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java

@@ -29,13 +29,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     //this.stageId = greatest stage id in the database + 1
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#persistAction(org.apache.ambari.server.actionmanager.HostAction)
-   */
-  @Override
-  public void persistAction(HostAction ha) {
-  }
-
   /* (non-Javadoc)
    * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getAction(java.lang.String)
    */
@@ -48,34 +41,10 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
    * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getAllStages(java.lang.String)
    */
   @Override
-  public List<Stage> getAllStages(String requestId) {
-    return null;
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getQueuedStages()
-   */
-  @Override
-  public List<Stage> getQueuedStages() {
+  public List<Stage> getAllStages(long requestId) {
     return null;
   }
 
-  /* (non-Javadoc)
-   * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getNotQueuedStages()
-   */
-  @Override
-  public List<Stage> getNotQueuedStages() {
-    return null;
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getNextStageId()
-   */
-  @Override
-  public synchronized long getNextStageId() {
-    return ++stageId ;
-  }
-
   /* (non-Javadoc)
    * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#abortOperation(long)
    */
@@ -96,7 +65,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
    * @see org.apache.ambari.server.actionmanager.ActionDBAccessor#getPendingStages()
    */
   @Override
-  public List<Stage> getPendingStages() {
+  public List<Stage> getStagesInProgress() {
     return null;
   }
 

+ 42 - 39
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBInMemoryImpl.java

@@ -19,8 +19,6 @@ package org.apache.ambari.server.actionmanager;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
 
 import org.apache.ambari.server.Role;
 
@@ -29,58 +27,63 @@ public class ActionDBInMemoryImpl implements ActionDBAccessor {
   List<Stage> stageList = new ArrayList<Stage>();
 
   @Override
-  public void persistAction(HostAction ha) {
-    // TODO Auto-generated method stub
-  }
-
-  @Override
-  public Stage getAction(String actionId) {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
-  public List<Stage> getAllStages(String requestId) {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
-  public List<Stage> getQueuedStages() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
-  public List<Stage> getNotQueuedStages() {
-    // TODO Auto-generated method stub
+  public synchronized Stage getAction(String actionId) {
+    for (Stage s: stageList) {
+      if (s.getActionId().equals(actionId)) {
+        return s;
+      }
+    }
     return null;
   }
-
   @Override
-  public long getNextStageId() {
-    // TODO Auto-generated method stub
-    return 0;
+  public synchronized List<Stage> getAllStages(long requestId) {
+    List<Stage> l = new ArrayList<Stage>();
+    for (Stage s: stageList) {
+      if (s.getRequestId() == requestId) {
+        l.add(s);
+      }
+    }
+    return l;
   }
 
   @Override
-  public void abortOperation(long requestId) {
-    // TODO Auto-generated method stub
+  public synchronized void abortOperation(long requestId) {
+    for (Stage s : stageList) {
+      if (s.getRequestId() == requestId) {
+        for(String host: s.getHostActions().keySet()) {
+          for (HostRoleCommand role : s.getHostActions().get(host).getRoleCommands()) {
+            role.setStatus(HostRoleStatus.ABORTED);
+          }
+        }
+      }
+    }
   }
 
   @Override
-  public void timeoutHostRole(String host, long requestId, long stageId, Role role) {
-    // TODO Auto-generated method stub
+  public synchronized void timeoutHostRole(String host, long requestId,
+      long stageId, Role role) {
+    for (Stage s : stageList) {
+      for (HostRoleCommand r : s.getHostActions().get(host).getRoleCommands()) {
+        if (r.getRole().equals(role)) {
+          r.setStatus(HostRoleStatus.TIMEDOUT);
+        }
+      }
+    }
   }
 
   @Override
-  public List<Stage> getPendingStages() {
-    // TODO Auto-generated method stub
-    return null;
+  public synchronized List<Stage> getStagesInProgress() {
+    List<Stage> l = new ArrayList<Stage>();
+    for (Stage s: stageList) {
+      if (s.isStageInProgress()) {
+        l.add(s);
+      }
+    }
+    return l;
   }
 
   @Override
-  public void persistActions(List<Stage> stages) {
+  public synchronized void persistActions(List<Stage> stages) {
     for (Stage s: stages) {
       stageList.add(s);
     }

+ 2 - 2
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java

@@ -39,9 +39,9 @@ public class ActionManager {
 
   @Inject
   public ActionManager(long schedulerSleepTime, long actionTimeout,
-      ActionQueue aq, Clusters fsm) {
+      ActionQueue aq, Clusters fsm, ActionDBAccessor db) {
     this.actionQueue = aq;
-    db = new ActionDBAccessorImpl();
+    this.db = db;
     scheduler = new ActionScheduler(schedulerSleepTime, actionTimeout, db,
         actionQueue, fsm, 2);
     this.fsm = fsm;

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java

@@ -83,7 +83,7 @@ class ActionScheduler implements Runnable {
   }
 
   private void doWork() throws AmbariException {
-    List<Stage> stages = db.getPendingStages();
+    List<Stage> stages = db.getStagesInProgress();
     if (stages == null || stages.isEmpty()) {
       //Nothing to do
       return;

+ 7 - 6
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleStatus.java

@@ -18,10 +18,11 @@
 package org.apache.ambari.server.actionmanager;
 
 public enum HostRoleStatus {
-  PENDING,
-  QUEUED,
-  COMPLETED,
-  FAILED,
-  TIMEDOUT,
-  ABORTED
+  PENDING, //Not queued for a host
+  QUEUED, //Queued for a host
+  IN_PROGRESS, //Host reported it is working
+  COMPLETED, //Host reported success
+  FAILED, //Failed
+  TIMEDOUT, //Host did not respond in time
+  ABORTED //Operation was abandoned
 }

+ 13 - 0
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java

@@ -124,4 +124,17 @@ public class Stage {
   public long getStartTime(String hostname) {
     return getHostAction(hostname).getStartTime();
   }
+  
+  public synchronized boolean isStageInProgress() {
+    for(String host: hostActions.keySet()) {
+      for (HostRoleCommand role : hostActions.get(host).getRoleCommands()) {
+        if (role.getStatus().equals(HostRoleStatus.PENDING) ||
+            role.getStatus().equals(HostRoleStatus.QUEUED) || 
+            role.getStatus().equals(HostRoleStatus.IN_PROGRESS)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
 }

+ 7 - 4
ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java

@@ -53,11 +53,11 @@ public class ActionQueue {
       q = new LinkedList<AgentCommand>();
       addQueue(hostname, q);
     }
-    if (q.contains(cmd)) {
-      LOG.warn("cmd already exists in the queue, not adding again");
-      return;
-    }
     synchronized (q) {
+      if (q.contains(cmd)) {
+        LOG.warn("cmd already exists in the queue, not adding again");
+        return;
+      }
       q.add(cmd);
     }
   }
@@ -74,6 +74,9 @@ public class ActionQueue {
 
   public List<AgentCommand> dequeueAll(String hostname) {
     Queue<AgentCommand> q = getQueue(hostname);
+    if (q == null) {
+      return null;
+    }
     List<AgentCommand> l = new ArrayList<AgentCommand>();
     synchronized (q) {
       while (true) {

+ 8 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java

@@ -77,4 +77,12 @@ public class HeartBeat {
   public List<CommandReport> getCommandReports() {
     return this.reports;
   }
+
+  public HostStatus getNodeStatus() {
+    return nodeStatus;
+  }
+
+  public void setNodeStatus(HostStatus nodeStatus) {
+    this.nodeStatus = nodeStatus;
+  }
 }

+ 28 - 9
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java

@@ -23,12 +23,16 @@ import java.util.List;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitonException;
+import org.apache.ambari.server.state.live.AgentVersion;
 import org.apache.ambari.server.state.live.Cluster;
 import org.apache.ambari.server.state.live.Clusters;
 import org.apache.ambari.server.state.live.host.Host;
 import org.apache.ambari.server.state.live.host.HostEvent;
 import org.apache.ambari.server.state.live.host.HostEventType;
+import org.apache.ambari.server.state.live.host.HostHealthyHeartbeatEvent;
+import org.apache.ambari.server.state.live.host.HostRegistrationRequestEvent;
 import org.apache.ambari.server.state.live.host.HostState;
+import org.apache.ambari.server.state.live.host.HostUnhealthyHeartbeatEvent;
 import org.apache.ambari.server.state.live.svccomphost.ServiceComponentHost;
 import org.apache.ambari.server.state.live.svccomphost.ServiceComponentHostLiveState;
 import org.apache.ambari.server.state.live.svccomphost.ServiceComponentHostState;
@@ -67,10 +71,14 @@ public class HeartBeatHandler {
     response.setResponseId(0L);
     String hostname = heartbeat.getHostname();
     Host hostObject = clusterFsm.getHost(hostname);
+    long now = System.currentTimeMillis();
     try {
-      // TODO: handle unhealthy heartbeat as well
-      hostObject.handleEvent(new HostEvent(hostname,
-          HostEventType.HOST_HEARTBEAT_HEALTHY));
+      if (heartbeat.getNodeStatus().getStatus()
+          .equals(HostStatus.Status.HEALTHY)) {
+        hostObject.handleEvent(new HostHealthyHeartbeatEvent(hostname, now));
+      } else {
+        hostObject.handleEvent(new HostUnhealthyHeartbeatEvent(hostname, now, null));
+      }
     } catch (InvalidStateTransitonException ex) {
       hostObject.setState(HostState.INIT);
       RegistrationCommand regCmd = new RegistrationCommand();
@@ -112,14 +120,25 @@ public class HeartBeatHandler {
   public RegistrationResponse handleRegistration(Register register)
       throws InvalidStateTransitonException, AmbariException {
     String hostname = register.getHostname();
-    List<String> roles = clusterFsm.getHostComponents(hostname);
+    long now = System.currentTimeMillis();
+    List<StatusCommand> cmds = new ArrayList<StatusCommand>();
+    for (Cluster cl : clusterFsm.getClustersForHost(hostname)) {
+      List<ServiceComponentHost> roleList = cl
+          .getServiceComponentHosts(hostname);
+      List<String> roles = new ArrayList<String>();
+      for (ServiceComponentHost sch : roleList) {
+        roles.add(sch.getServiceComponentName());
+      }
+      StatusCommand statusCmd = new StatusCommand();
+      statusCmd.setRoles(roles);
+      cmds.add(statusCmd);
+    }
     Host hostObject = clusterFsm.getHost(hostname);
     RegistrationResponse response = new RegistrationResponse();
-    StatusCommand statusCmd = new StatusCommand();
-    statusCmd.setRoles(roles);
-    response.setCommand(statusCmd);
-    hostObject.handleEvent(new HostEvent(hostname,
-        HostEventType.HOST_REGISTRATION_REQUEST));
+    response.setCommand(cmds);
+
+    hostObject.handleEvent(new HostRegistrationRequestEvent(hostname,
+        new AgentVersion("v1"), now, register.getHardwareProfile()));
     return response;
   }
 }

+ 2 - 1
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java

@@ -26,6 +26,7 @@ import org.apache.ambari.server.state.live.Clusters;
 import org.apache.ambari.server.state.live.host.Host;
 import org.apache.ambari.server.state.live.host.HostEvent;
 import org.apache.ambari.server.state.live.host.HostEventType;
+import org.apache.ambari.server.state.live.host.HostHeartbeatLostEvent;
 import org.apache.ambari.server.state.live.host.HostState;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -93,7 +94,7 @@ public class HeartbeatMonitor implements Runnable {
       if (lastHeartbeat + 5*threadWakeupInterval < now) {
         LOG.warn("Hearbeat lost from host "+host);
         //Heartbeat is expired
-        hostObj.handleEvent(new HostEvent(host, HostEventType.HOST_HEARTBEAT_LOST));
+        hostObj.handleEvent(new HostHeartbeatLostEvent(host));
         //Purge action queue
         actionQueue.dequeueAll(host);
         //notify action manager

+ 20 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/HostStatus.java

@@ -28,6 +28,14 @@ import javax.xml.bind.annotation.XmlType;
 @XmlAccessorType(XmlAccessType.FIELD)
 @XmlType(name = "", propOrder = {})
 public class HostStatus {
+  public HostStatus(Status status, String cause) {
+    super();
+    this.status = status;
+    this.cause = cause;
+  }
+  public HostStatus() {
+    super();
+  }
   @XmlType
   @XmlEnum
   public enum Status {
@@ -38,4 +46,16 @@ public class HostStatus {
   Status status;
   @XmlElement
   String cause;
+  public Status getStatus() {
+    return status;
+  }
+  public void setStatus(Status status) {
+    this.status = status;
+  }
+  public String getCause() {
+    return cause;
+  }
+  public void setCause(String cause) {
+    this.cause = cause;
+  }
 }

+ 8 - 6
ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java

@@ -18,6 +18,8 @@
 
 package org.apache.ambari.server.agent;
 
+import java.util.List;
+
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
@@ -35,8 +37,8 @@ import javax.xml.bind.annotation.XmlType;
 public class RegistrationResponse {
   @XmlElement
   private RegistrationStatus response;
-
-  private AgentCommand command = null;
+  
+  private List<StatusCommand> commands = null;
 
   public RegistrationStatus getResponseStatus() {
     return response;
@@ -46,11 +48,11 @@ public class RegistrationResponse {
     this.response = response;
   }
 
-  public AgentCommand getCommand() {
-    return command;
+  public List<StatusCommand> getCommand() {
+    return commands;
   }
 
-  public void setCommand(AgentCommand command) {
-    this.command = command;
+  public void setCommand(List<StatusCommand> commands) {
+    this.commands = commands;
   }
 }

+ 8 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java

@@ -52,4 +52,12 @@ public class StatusCommand extends AgentCommand {
   public void setRoles(List<String> roles) {
     this.roles = roles;
   }
+
+  public String getClusterName() {
+    return clusterName;
+  }
+
+  public void setClusterName(String clusterName) {
+    this.clusterName = clusterName;
+  }
 }

+ 4 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/live/Cluster.java

@@ -18,6 +18,8 @@
 
 package org.apache.ambari.server.state.live;
 
+import java.util.List;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitonException;
 import org.apache.ambari.server.state.live.svccomphost.ServiceComponentHost;
@@ -95,4 +97,6 @@ public interface Cluster {
   public ServiceComponentHost getServiceComponentHost(String serviceName,
       String componentName, String hostname) throws AmbariException;
 
+  public List<ServiceComponentHost> getServiceComponentHosts(String hostname);
+
 }

+ 6 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/live/ClusterImpl.java

@@ -168,5 +168,11 @@ public class ClusterImpl implements Cluster {
   public long getClusterId() {
     return clusterId;
   }
+  
+  @Override
+  public List<ServiceComponentHost> getServiceComponentHosts(String hostname) {
+    // TODO Auto-generated method stub
+    return null;
+  }
 
 }

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/live/host/HostEvent.java

@@ -23,7 +23,7 @@ import org.apache.ambari.server.state.fsm.event.AbstractEvent;
 /**
  * Base class for all events that affect the Host FSM
  */
-public class HostEvent extends AbstractEvent<HostEventType> {
+public abstract class HostEvent extends AbstractEvent<HostEventType> {
 
   /**
    * Hostname of the Host

+ 4 - 4
ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java

@@ -55,8 +55,8 @@ public class TestActionScheduler {
     ha.addHostRoleCommand(hrc);
    // ha.setManifest("1-977-manifest");
     s.addHostAction(hostname, ha);
-    when(db.getPendingStages()).thenReturn(stages);
-
+    when(db.getStagesInProgress()).thenReturn(stages);
+    
     //Keep large number of attempts so that the task is not expired finally
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(1000, 100, db, aq, fsm, 10000);
@@ -107,8 +107,8 @@ public class TestActionScheduler {
     ha.addHostRoleCommand(hrc);
     // ha.setManifest("1-977-manifest");
     s.addHostAction(hostname, ha);
-    when(db.getPendingStages()).thenReturn(stages);
-
+    when(db.getStagesInProgress()).thenReturn(stages);
+    
     //Keep large number of attempts so that the task is not expired finally
     //Small action timeout to test rescheduling
     ActionScheduler scheduler = new ActionScheduler(100, 100, db, aq, fsm, 3);

+ 0 - 1
ambari-server/src/test/java/org/apache/ambari/server/agent/AgentResourceTest.java

@@ -25,7 +25,6 @@ import javax.ws.rs.core.MediaType;
 
 import junit.framework.Assert;
 
-import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.agent.rest.AgentResource;
 import org.apache.ambari.server.state.live.Clusters;

+ 58 - 0
ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java

@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.actionmanager.ActionDBInMemoryImpl;
+import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.agent.HostStatus.Status;
+import org.apache.ambari.server.state.live.Clusters;
+import org.apache.ambari.server.state.live.host.Host;
+import org.apache.ambari.server.state.live.host.HostImpl;
+import org.apache.ambari.server.state.live.host.HostState;
+import org.junit.Test;
+
+public class TestHeartbeatHandler {
+
+  @Test
+  public void testHeartbeat() throws AmbariException {
+    ActionManager am = new ActionManager(0, 0, null, null,
+        new ActionDBInMemoryImpl());
+    Clusters fsm = mock(Clusters.class);
+    String hostname = "host1";
+    ActionQueue aq = new ActionQueue();
+    ExecutionCommand execCmd = new ExecutionCommand();
+    execCmd.setCommandId("2-34");
+    execCmd.setHostName(hostname);
+    aq.enqueue(hostname, new ExecutionCommand());
+    HeartBeatHandler handler = new HeartBeatHandler(fsm, aq, am);
+    HeartBeat hb = new HeartBeat();
+    hb.setNodeStatus(new HostStatus(Status.HEALTHY, "I am ok"));
+    hb.setHostname(hostname);
+    Host hostObject = new HostImpl(hostname);
+    hostObject.setState(HostState.UNHEALTHY);
+    when(fsm.getHost(hostname)).thenReturn(hostObject);
+    handler.handleHeartBeat(hb);
+    assertEquals(HostState.HEALTHY, hostObject.getState());
+    assertEquals(0, aq.dequeueAll(hostname).size());
+  }
+
+}