Sfoglia il codice sorgente

AMBARI-713. Initial work on Job FSM. (Contributed by hitesh)

git-svn-id: https://svn.apache.org/repos/asf/incubator/ambari/branches/AMBARI-666@1383107 13f79535-47bb-0310-9956-ffa450edef68
Hitesh Shah 12 anni fa
parent
commit
8d9a423e44
49 ha cambiato i file con 602 aggiunte e 264 eliminazioni
  1. 2 0
      AMBARI-666-CHANGES.txt
  2. 11 11
      ambari-server/src/main/java/org/apache/ambari/controller/agent/rest/AgentResource.java
  3. 5 5
      ambari-server/src/main/java/org/apache/ambari/controller/agent/rest/HardwareProfile.java
  4. 19 19
      ambari-server/src/main/java/org/apache/ambari/controller/agent/rest/HeartBeat.java
  5. 6 6
      ambari-server/src/main/java/org/apache/ambari/controller/agent/rest/HeartBeatResponse.java
  6. 12 12
      ambari-server/src/main/java/org/apache/ambari/controller/agent/rest/Register.java
  7. 3 3
      ambari-server/src/main/java/org/apache/ambari/controller/agent/rest/RegistrationResponse.java
  8. 1 1
      ambari-server/src/main/java/org/apache/ambari/controller/api/rest/HealthCheck.java
  9. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/Role.java
  10. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostAction.java
  11. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java
  12. 2 2
      ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java
  13. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/agentprotocol/ActionQueue.java
  14. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/agentprotocol/AgentCommand.java
  15. 3 3
      ambari-server/src/main/java/org/apache/ambari/server/agentprotocol/Heartbeat.java
  16. 2 2
      ambari-server/src/main/java/org/apache/ambari/server/agentprotocol/NodeStatus.java
  17. 10 10
      ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
  18. 3 3
      ambari-server/src/main/java/org/apache/ambari/server/controller/HostRequest.java
  19. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/controller/JobRequest.java
  20. 4 4
      ambari-server/src/main/java/org/apache/ambari/server/controller/Request.java
  21. 2 2
      ambari-server/src/main/java/org/apache/ambari/server/controller/ServiceConfigurationRequest.java
  22. 4 4
      ambari-server/src/main/java/org/apache/ambari/server/controller/ServiceRequest.java
  23. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/state/fsm/InvalidStateTransitonException.java
  24. 3 3
      ambari-server/src/main/java/org/apache/ambari/server/state/live/AgentVersion.java
  25. 4 4
      ambari-server/src/main/java/org/apache/ambari/server/state/live/Cluster.java
  26. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/state/live/Clusters.java
  27. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/state/live/DiskInfo.java
  28. 0 83
      ambari-server/src/main/java/org/apache/ambari/server/state/live/JobImpl.java
  29. 6 5
      ambari-server/src/main/java/org/apache/ambari/server/state/live/ServiceComponentNode.java
  30. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/state/live/ServiceComponentNodeEvent.java
  31. 1 0
      ambari-server/src/main/java/org/apache/ambari/server/state/live/ServiceComponentNodeImpl.java
  32. 23 3
      ambari-server/src/main/java/org/apache/ambari/server/state/live/job/Job.java
  33. 18 0
      ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobCompletedEvent.java
  34. 10 3
      ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobEvent.java
  35. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobEventType.java
  36. 21 0
      ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobFailedEvent.java
  37. 12 1
      ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobId.java
  38. 295 0
      ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobImpl.java
  39. 19 0
      ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobProgressUpdateEvent.java
  40. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobState.java
  41. 7 0
      ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobType.java
  42. 19 0
      ambari-server/src/main/java/org/apache/ambari/server/state/live/job/NewJobEvent.java
  43. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/state/live/node/Node.java
  44. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/state/live/node/NodeEvent.java
  45. 6 6
      ambari-server/src/main/java/org/apache/ambari/server/state/live/node/NodeHealthStatus.java
  46. 30 30
      ambari-server/src/main/java/org/apache/ambari/server/state/live/node/NodeImpl.java
  47. 3 3
      ambari-server/src/main/java/org/apache/ambari/server/state/live/node/NodeRegistrationRequestEvent.java
  48. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/state/live/node/NodeUnhealthyHeartbeatEvent.java
  49. 21 21
      ambari-server/src/test/java/org/apache/ambari/server/state/live/node/TestNodeImpl.java

+ 2 - 0
AMBARI-666-CHANGES.txt

@@ -12,6 +12,8 @@ AMBARI-666 branch (unreleased changes)
 
   NEW FEATURES
 
+  AMBARI-713. Initial work on Job FSM. (hitesh)
+
   AMBARI-712. Action manager skeleton. (jitendra)
 
   AMBARI-710. Basic registration and heartbeat protocol implementation between

+ 11 - 11
ambari-server/src/main/java/org/apache/ambari/controller/agent/rest/AgentResource.java

@@ -33,10 +33,10 @@ import org.apache.commons.logging.LogFactory;
 
 import com.google.inject.Inject;
 
-/** 
+/**
  * Agent Resource represents Ambari agent controller.
  * It provides API for Ambari agents to get the cluster configuration changes
- * as well as report the node attributes and state of services running the on 
+ * as well as report the node attributes and state of services running the on
  * the cluster nodes
  */
 @Path("/")
@@ -50,7 +50,7 @@ public class AgentResource {
   }
 
   /**
-   * Register information about the host (Internal API to be used for 
+   * Register information about the host (Internal API to be used for
    * Ambari Agent)
    * @response.representation.200.doc This API is invoked by Ambari agent running
    *  on a cluster to register with the server.
@@ -58,37 +58,37 @@ public class AgentResource {
    * @response.representation.406.doc Error in register message format
    * @response.representation.408.doc Request Timed out
    * @param message Register message
-   * @throws Exception 
+   * @throws Exception
    */
   @Path("register/{hostName}")
   @POST
   @Consumes(MediaType.APPLICATION_JSON)
   @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-  public RegistrationResponse register(Register message, 
-      @Context HttpServletRequest req)  
+  public RegistrationResponse register(Register message,
+      @Context HttpServletRequest req)
       throws WebApplicationException {
     LOG.info("Post input = " + req.toString());
     RegistrationResponse response = new RegistrationResponse();
     LOG.info("Received message from agent " + message.toString());
     return response;
   }
-  
-  /** 
+
+  /**
    * Update state of the node (Internal API to be used by Ambari agent).
-   *  
+   *
    * @response.representation.200.doc This API is invoked by Ambari agent running
    *  on a cluster to update the state of various services running on the node.
    * @response.representation.200.mediaType application/json
    * @response.representation.406.doc Error in heartbeat message format
    * @response.representation.408.doc Request Timed out
    * @param message Heartbeat message
-   * @throws Exception 
+   * @throws Exception
    */
   @Path("heartbeat/{hostname}")
   @POST
   @Consumes(MediaType.APPLICATION_JSON)
   @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
-  public HeartBeatResponse heartbeat(HeartBeat message) 
+  public HeartBeatResponse heartbeat(HeartBeat message)
       throws WebApplicationException {
     HeartBeatResponse heartBeatResponse = new HeartBeatResponse();
     try {

+ 5 - 5
ambari-server/src/main/java/org/apache/ambari/controller/agent/rest/HardwareProfile.java

@@ -25,7 +25,7 @@ import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlType;
 
 /**
- * 
+ *
  * Data model for Ambari Agent to send hardware profile to Ambari Server.
  *
  */
@@ -114,16 +114,16 @@ public class HardwareProfile {
   @XmlElement
   private long uptime_days;
   @XmlElement
-  private long uptime_hours;   
-  
+  private long uptime_hours;
+
   public long getUpTimeDays() {
     return this.uptime_days;
   }
-  
+
   public void setUpTimeDays(long uptime_days) {
     this.uptime_days = uptime_days;
   }
-  
+
   public String toString() {
     return "memory=" + this.memorytotal + "\n" +
            "uptime_hours=" + this.uptime_hours;

+ 19 - 19
ambari-server/src/main/java/org/apache/ambari/controller/agent/rest/HeartBeat.java

@@ -26,13 +26,13 @@ import javax.xml.bind.annotation.XmlType;
 
 
 /**
- * 
+ *
  * Data model for Ambari Agent to send heartbeat to Ambari Controller.
  *
  */
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.FIELD)
-@XmlType(name = "", propOrder = {"responseId","timestamp", 
+@XmlType(name = "", propOrder = {"responseId","timestamp",
     "hostname", "hardwareProfile", "installedRoleStates", "installScriptHash",
     "actionResults", "firstContact", "idle"})
 public class HeartBeat {
@@ -50,61 +50,61 @@ public class HeartBeat {
   private boolean firstContact;
   @XmlElement
   private boolean idle;
-  
+
   public short getResponseId() {
     return responseId;
   }
-  
+
   public void setResponseId(short responseId) {
     this.responseId=responseId;
   }
-  
+
   public long getTimestamp() {
     return timestamp;
   }
-  
+
   public String getHostname() {
     return hostname;
   }
-  
+
   public boolean getFirstContact() {
     return firstContact;
   }
-  
+
   public boolean getIdle() {
     return idle;
   }
-  
+
   public HardwareProfile getHardwareProfile() {
     return hardwareProfile;
   }
-  
-  
+
+
   public int getInstallScriptHash() {
     return installScriptHash;
   }
-  
+
   public void setTimestamp(long timestamp) {
     this.timestamp = timestamp;
   }
-  
+
   public void setHostname(String hostname) {
     this.hostname = hostname;
   }
- 
+
   public void setHardwareProfile(HardwareProfile hardwareProfile) {
-    this.hardwareProfile = hardwareProfile;    
+    this.hardwareProfile = hardwareProfile;
   }
-  
-  
+
+
   public void setFirstContact(boolean firstContact) {
     this.firstContact = firstContact;
   }
-  
+
   public void setIdle(boolean idle) {
     this.idle = idle;
   }
-  
+
   public void setInstallScriptHash(int hash) {
     this.installScriptHash = hash;
   }

+ 6 - 6
ambari-server/src/main/java/org/apache/ambari/controller/agent/rest/HeartBeatResponse.java

@@ -25,7 +25,7 @@ import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlType;
 
 /**
- * 
+ *
  * Controller to Agent response data model.
  *
  */
@@ -37,22 +37,22 @@ public class HeartBeatResponse {
   public short responseId;
   @XmlElement
   public String clusterId;
- 
+
 
   public short getResponseId() {
     return responseId;
   }
-  
+
   public void setResponseId(short responseId) {
     this.responseId=responseId;
   }
-  
+
   public String getClusterId() {
     return clusterId;
   }
-  
+
   public void setClusterId(String clusterId) {
     this.clusterId = clusterId;
   }
-  
+
 }

+ 12 - 12
ambari-server/src/main/java/org/apache/ambari/controller/agent/rest/Register.java

@@ -26,13 +26,13 @@ import javax.xml.bind.annotation.XmlType;
 
 
 /**
- * 
+ *
  * Data model for Ambari Agent to send heartbeat to Ambari Controller.
  *
  */
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.FIELD)
-@XmlType(name = "", propOrder = {"responseId","timestamp", 
+@XmlType(name = "", propOrder = {"responseId","timestamp",
     "hostname", "hardwareProfile"})
 public class Register {
   @XmlElement
@@ -43,39 +43,39 @@ public class Register {
   private String hostname;
   @XmlElement
   private HardwareProfile hardwareProfile;
-  
+
   public short getResponseId() {
     return responseId;
   }
-  
+
   public void setResponseId(short responseId) {
     this.responseId=responseId;
   }
-  
+
   public long getTimestamp() {
     return timestamp;
   }
-  
+
   public String getHostname() {
     return hostname;
   }
- 
+
   public HardwareProfile getHardwareProfile() {
     return hardwareProfile;
   }
-  
+
   public void setTimestamp(long timestamp) {
     this.timestamp = timestamp;
   }
-  
+
   public void setHostname(String hostname) {
     this.hostname = hostname;
   }
- 
+
   public void setHardwareProfile(HardwareProfile hardwareProfile) {
-    this.hardwareProfile = hardwareProfile;    
+    this.hardwareProfile = hardwareProfile;
   }
-  
+
   @Override
   public String toString() {
     return "responseId=" + responseId + "\n" +

+ 3 - 3
ambari-server/src/main/java/org/apache/ambari/controller/agent/rest/RegistrationResponse.java

@@ -25,7 +25,7 @@ import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlType;
 
 /**
- * 
+ *
  * Controller to Agent response data model.
  *
  */
@@ -35,12 +35,12 @@ import javax.xml.bind.annotation.XmlType;
 public class RegistrationResponse {
   @XmlElement
   public short responseId;
- 
+
 
   public short getResponseId() {
     return responseId;
   }
-  
+
   public void setResponseId(short responseId) {
     this.responseId=responseId;
   }

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/controller/api/rest/HealthCheck.java

@@ -52,5 +52,5 @@ public class HealthCheck {
     return "<html> " + "<title>" + "Status" + "</title>"
         + "<body><h1>" + status + "</body></h1>" + "</html> ";
   }
-} 
+}
 

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/Role.java

@@ -19,7 +19,7 @@
 package org.apache.ambari.server;
 
 //This enumerates all the roles that the server can handle.
-//Each component or a job maps to a particular role. 
+//Each component or a job maps to a particular role.
 public enum Role {
   ZOOKEEPER_SERVER,
   ZOOKEEPER_CLIENT,

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostAction.java

@@ -25,7 +25,7 @@ public class HostAction {
   private Map<String, String> params = null;
   private String manifest = null;
   private List<HostRoleCommand> roles;
-  
+
   public HostAction(String host) {
     this.host = host;
   }

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/HostRoleCommand.java

@@ -22,7 +22,7 @@ import java.util.Map;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
 
-/** 
+/**
  * This class encapsulates all the information for an action
  * on a host for a particular role. This class will be used to schedule, persist and track
  * an action.

+ 2 - 2
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java

@@ -20,12 +20,12 @@ package org.apache.ambari.server.actionmanager;
 import java.util.List;
 import java.util.Map;
 
-//This class encapsulates the stage. The stage encapsulates all the information 
+//This class encapsulates the stage. The stage encapsulates all the information
 //required to persist an action.
 public class Stage {
   private long requestId;
   private long stageId = -1;
-  
+
   //Map of host to host-roles
   private Map<String, HostAction> action;
   private String logDir;

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/agentprotocol/ActionQueue.java

@@ -47,7 +47,7 @@ public class ActionQueue {
       return q.remove();
     }
   }
-  
+
   public List<AgentCommand> dequeueAll(String hostname) {
     Queue<AgentCommand> q = getQueue(hostname);
     List<AgentCommand> l = new ArrayList<AgentCommand>();

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/agentprotocol/AgentCommand.java

@@ -18,5 +18,5 @@
 package org.apache.ambari.server.agentprotocol;
 
 public class AgentCommand {
-  private String commandId;  
+  private String commandId;
 }

+ 3 - 3
ambari-server/src/main/java/org/apache/ambari/server/agentprotocol/Heartbeat.java

@@ -26,8 +26,8 @@ public class Heartbeat {
   List<ComponentStatus> componentStatus;
   NodeInfo nodeInfo;
   NodeStatus nodeStatus;
-  
+
   public class NodeInfo {
-    
+
   }
-}
+}

+ 2 - 2
ambari-server/src/main/java/org/apache/ambari/server/agentprotocol/NodeStatus.java

@@ -22,7 +22,7 @@ public class NodeStatus {
     HEALTHY,
     UNHEALTHY
   }
-  
+
   Status status;
-  String cause;  
+  String cause;
 }

+ 10 - 10
ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java

@@ -39,7 +39,7 @@ public class AmbariServer {
  public static int CLIENT_PORT = 4080;
  private Server server = null;
  public volatile boolean running = true; // true while controller runs
- 
+
  public void run() {
    server = new Server(CLIENT_PORT);
 
@@ -47,25 +47,25 @@ public class AmbariServer {
      Context root = new Context(server, "/", Context.SESSIONS);
      ServletHolder rootServlet = root.addServlet(DefaultServlet.class, "/");
      rootServlet.setInitOrder(1);
-     
+
      ServletHolder sh = new ServletHolder(ServletContainer.class);
-     sh.setInitParameter("com.sun.jersey.config.property.resourceConfigClass", 
+     sh.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
        "com.sun.jersey.api.core.PackagesResourceConfig");
-     sh.setInitParameter("com.sun.jersey.config.property.packages", 
+     sh.setInitParameter("com.sun.jersey.config.property.packages",
        "org.apache.ambari.controller.api.rest");
      root.addServlet(sh, "/api/*");
      sh.setInitOrder(2);
 
      ServletHolder agent = new ServletHolder(ServletContainer.class);
-     agent.setInitParameter("com.sun.jersey.config.property.resourceConfigClass", 
+     agent.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
        "com.sun.jersey.api.core.PackagesResourceConfig");
-     agent.setInitParameter("com.sun.jersey.config.property.packages", 
+     agent.setInitParameter("com.sun.jersey.config.property.packages",
        "org.apache.ambari.controller.agent.rest");
      root.addServlet(agent, "/agent/*");
      agent.setInitOrder(3);
-     
+
      server.setStopAtShutdown(true);
-     
+
      /*
       * Start the server after controller state is recovered.
       */
@@ -75,10 +75,10 @@ public class AmbariServer {
      LOG.info("Joined the Server");
    } catch (Exception e) {
      LOG.error("Error in the server", e);
-     
+
    }
  }
- 
+
  public void stop() throws Exception {
    try {
      server.stop();

+ 3 - 3
ambari-server/src/main/java/org/apache/ambari/server/controller/HostRequest.java

@@ -26,10 +26,10 @@ import java.util.Map;
  */
 public class HostRequest extends Request {
   private List<String> hosts;
-  
-  public HostRequest(long requestId, Request.Method m, List<String> hostList, 
+
+  public HostRequest(long requestId, Request.Method m, List<String> hostList,
       Map<String, String> params) {
     super(requestId, m, params);
     hosts = hostList;
-  }  
+  }
 }

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/controller/JobRequest.java

@@ -29,7 +29,7 @@ public class JobRequest extends Request {
 
   final private String job;
   final private String serviceName; //can be null if action is at host level
-  final private List<String> hosts; 
+  final private List<String> hosts;
 
   public JobRequest(long requestId, Request.Method m, String job, String serviceName,
       List<String> hosts, Map<String, String> params) {

+ 4 - 4
ambari-server/src/main/java/org/apache/ambari/server/controller/Request.java

@@ -20,19 +20,19 @@ package org.apache.ambari.server.controller;
 import java.util.Map;
 
 public class Request {
-  
+
   public enum Method {
     GET,
     PUT,
     POST,
     DELETE
   }
-  
+
   private long requestId;
   private Map<String, String> params;
   private Method method;
-  
-  
+
+
   public Request(long requestId, Method m) {
     this(requestId, m, null);
   }

+ 2 - 2
ambari-server/src/main/java/org/apache/ambari/server/controller/ServiceConfigurationRequest.java

@@ -27,9 +27,9 @@ import java.util.Map;
 public class ServiceConfigurationRequest extends Request {
   final private String clusterName;
   final private String serviceName;
-  
+
   //The complete set of desired configurations can be derived as
-  //properties corresponding to baseConfigVersion overridden with 
+  //properties corresponding to baseConfigVersion overridden with
   //desiredProperties.
   final private String baseConfigVersion;
   private Map<String, String> desiredProperties = null;

+ 4 - 4
ambari-server/src/main/java/org/apache/ambari/server/controller/ServiceRequest.java

@@ -31,16 +31,16 @@ public class ServiceRequest extends Request {
   private String service;
   private String configVersion;
   private String hostComponentMapVersion;
-  
+
   public class ComponentRequest {
     private String componentName;
     private Map<String, String> params = new HashMap<String, String>();
   }
-  
+
   public void addComponentRequest(ComponentRequest c) {
     components.add(c);
   }
-  
+
   public ServiceRequest(long requestId, Request.Method m, String clusterName,
       String service, String configVersion, String hostComponentMapVersion) {
     super(requestId, m);
@@ -49,6 +49,6 @@ public class ServiceRequest extends Request {
     this.configVersion = configVersion;
     this.hostComponentMapVersion = hostComponentMapVersion;
   }
-  
+
   private List<ComponentRequest> components = new ArrayList<ComponentRequest>();
 }

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/fsm/InvalidStateTransitonException.java

@@ -19,7 +19,7 @@
 package org.apache.ambari.server.state.fsm;
 
 /**
- * Exception thrown when a StateMachine encounters an invalid 
+ * Exception thrown when a StateMachine encounters an invalid
  * event at its current state.
  */
 @SuppressWarnings("serial")

+ 3 - 3
ambari-server/src/main/java/org/apache/ambari/server/state/live/AgentVersion.java

@@ -24,7 +24,7 @@ package org.apache.ambari.server.state.live;
 public class AgentVersion {
 
   private final String version;
-  
+
   public AgentVersion(String version) {
     this.version = version;
   }
@@ -35,5 +35,5 @@ public class AgentVersion {
   public String getVersion() {
     return version;
   }
-  
-} 
+
+}

+ 4 - 4
ambari-server/src/main/java/org/apache/ambari/server/state/live/Cluster.java

@@ -30,14 +30,14 @@ public interface Cluster {
    * @return
    */
   public NodeState getNodeState(String nodeName);
-  
+
   /**
    * Set the State for a given Node
    * @param nodeName Node's hostname for which state is to be set
    * @param state NodeState to set
    */
   public void setNodeState(String nodeName, NodeState state);
-  
+
   /**
    * Send event to the given Node
    * @param nodeName Node's hostname
@@ -45,7 +45,7 @@ public interface Cluster {
    */
   public void handleNodeEvent(String nodeName, NodeEvent event)
       throws InvalidStateTransitonException;
-  
+
   /**
    * Get the State for a given ServiceComponentNode
    * @param service Service name
@@ -77,5 +77,5 @@ public interface Cluster {
   public void handleServiceComponentNodeEvent(String service,
       String serviceComponent, String nodeName,
       ServiceComponentNodeEvent event) throws InvalidStateTransitonException;
-  
+
 }

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/live/Clusters.java

@@ -26,5 +26,5 @@ public interface Clusters {
    * @return
    */
   public Cluster getCluster(String clusterName);
-  
+
 }

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/live/DiskInfo.java

@@ -44,7 +44,7 @@ public class DiskInfo {
    * Capacity of the disk in bytes
    */
   long totalCapacityBytes;
-  
+
   /**
    * Current capacity in bytes
    */

+ 0 - 83
ambari-server/src/main/java/org/apache/ambari/server/state/live/JobImpl.java

@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.server.state.live;
-
-import org.apache.ambari.server.state.fsm.InvalidStateTransitonException;
-import org.apache.ambari.server.state.fsm.StateMachine;
-import org.apache.ambari.server.state.fsm.StateMachineFactory;
-
-public class JobImpl implements Job {
-
-  private static final StateMachineFactory
-    <JobImpl, JobState, JobEventType, JobEvent>
-      stateMachineFactory
-        = new StateMachineFactory<JobImpl, JobState,
-          JobEventType, JobEvent>
-            (JobState.INIT)
-
-    // define the state machine of a Job
-
-    .addTransition(JobState.INIT, JobState.IN_PROGRESS,
-        JobEventType.JOB_IN_PROGRESS)
-    .addTransition(JobState.IN_PROGRESS, JobState.IN_PROGRESS,
-        JobEventType.JOB_IN_PROGRESS)
-    .addTransition(JobState.IN_PROGRESS, JobState.COMPLETED,
-        JobEventType.JOB_COMPLETED)
-    .addTransition(JobState.IN_PROGRESS, JobState.FAILED,
-        JobEventType.JOB_FAILED)
-    .addTransition(JobState.COMPLETED, JobState.INIT,
-        JobEventType.JOB_INIT)
-    .addTransition(JobState.FAILED, JobState.INIT,
-        JobEventType.JOB_INIT)
-    .installTopology();
-
-  private final StateMachine<JobState, JobEventType, JobEvent>
-      stateMachine;
-
-  public JobImpl() {
-    super();
-    this.stateMachine = stateMachineFactory.make(this);
-  }
-
-  @Override
-  public JobState getState() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
-  public void setState(JobState state) {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void handleEvent(JobEvent event)
-      throws InvalidStateTransitonException {
-    // TODO
-    stateMachine.doTransition(event.getType(), event);
-  }
-
-  @Override
-  public JobId getId() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-}

+ 6 - 5
ambari-server/src/main/java/org/apache/ambari/server/state/live/ServiceComponentNode.java

@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.ambari.server.state.ConfigVersion;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitonException;
+import org.apache.ambari.server.state.live.job.Job;
 
 
 public interface ServiceComponentNode {
@@ -31,14 +32,14 @@ public interface ServiceComponentNode {
    * @return Name of the ServiceComponent
    */
   public String getServiceComponentName();
-  
+
   /**
    * Get the Node this object maps to
    * @return Node's hostname
    */
   public String getNodeName();
-  
-  
+
+
   /**
    * Get the Config Version
    * @return ConfigVersion
@@ -51,8 +52,8 @@ public interface ServiceComponentNode {
    * @return List of Jobs
    */
   public List<Job> getJobs();
-  
-  
+
+
   /**
    * Get ServiceComponent-Node State
    * @return ServiceComponentNodeState

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/live/ServiceComponentNodeEvent.java

@@ -35,7 +35,7 @@ public class ServiceComponentNodeEvent
    * Hostname of the Node that this event relates to
    */
   final String nodeName;
-  
+
   public ServiceComponentNodeEvent(ServiceComponentNodeEventType type,
       String serviceComponentName, String nodeName) {
     super(type);

+ 1 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/live/ServiceComponentNodeImpl.java

@@ -24,6 +24,7 @@ import org.apache.ambari.server.state.ConfigVersion;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitonException;
 import org.apache.ambari.server.state.fsm.StateMachine;
 import org.apache.ambari.server.state.fsm.StateMachineFactory;
+import org.apache.ambari.server.state.live.job.Job;
 
 public class ServiceComponentNodeImpl implements ServiceComponentNode {
 

+ 23 - 3
ambari-server/src/main/java/org/apache/ambari/server/state/live/Job.java → ambari-server/src/main/java/org/apache/ambari/server/state/live/job/Job.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.ambari.server.state.live;
+package org.apache.ambari.server.state.live.job;
 
 import org.apache.ambari.server.state.fsm.InvalidStateTransitonException;
 
@@ -31,8 +31,28 @@ public interface Job {
   // TODO requires some form of JobType to ensure only one running
   // job per job type
   // There may be gotchas such as de-commissioning should be allowed to happen
-  // on more than one node at a time  
-  
+  // on more than one node at a time
+
+
+  /**
+   * Get Start Time of the job
+   * @return Start time as a unix timestamp
+   */
+  public long getStartTime();
+
+  /**
+   * Get the last update time of the Job when its progress status
+   * was updated
+   * @return Last Update Time as a unix timestamp
+   */
+  public long getLastUpdateTime();
+
+  /**
+   * Time when the Job completed
+   * @return Completion Time as a unix timestamp
+   */
+  public long getCompletionTime();
+
   /**
    * Get the current state of the Job
    * @return JobState

+ 18 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobCompletedEvent.java

@@ -0,0 +1,18 @@
+package org.apache.ambari.server.state.live.job;
+
+public class JobCompletedEvent extends JobEvent {
+
+  private final long completionTime;
+
+  public JobCompletedEvent(JobId jobId, long completionTime) {
+    super(JobEventType.JOB_COMPLETED, jobId);
+    this.completionTime = completionTime;
+  }
+
+  /**
+   * @return the completionTime
+   */
+  public long getCompletionTime() {
+    return completionTime;
+  }
+}

+ 10 - 3
ambari-server/src/main/java/org/apache/ambari/server/state/live/JobEvent.java → ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobEvent.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.ambari.server.state.live;
+package org.apache.ambari.server.state.live.job;
 
 import org.apache.ambari.server.state.fsm.event.AbstractEvent;
 
@@ -28,10 +28,17 @@ public class JobEvent extends AbstractEvent<JobEventType> {
   /**
    * JobId identifying the job
    */
-  final JobId jobId;
-  
+  private final JobId jobId;
+
   public JobEvent(JobEventType type, JobId jobId) {
     super(type);
     this.jobId = jobId;
   }
+
+  /**
+   * @return the jobId
+   */
+  public JobId getJobId() {
+    return jobId;
+  }
 }

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/live/JobEventType.java → ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobEventType.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.ambari.server.state.live;
+package org.apache.ambari.server.state.live.job;
 
 public enum JobEventType {
   /**

+ 21 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobFailedEvent.java

@@ -0,0 +1,21 @@
+package org.apache.ambari.server.state.live.job;
+
+public class JobFailedEvent extends JobEvent {
+
+  private final long completionTime;
+
+  // TODO
+  // need to add job failed reason
+
+  public JobFailedEvent(JobId jobId, long completionTime) {
+    super(JobEventType.JOB_FAILED, jobId);
+    this.completionTime = completionTime;
+  }
+
+  /**
+   * @return the completionTime
+   */
+  public long getCompletionTime() {
+    return completionTime;
+  }
+}

+ 12 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/live/JobId.java → ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobId.java

@@ -16,9 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.ambari.server.state.live;
+package org.apache.ambari.server.state.live.job;
 
 // TODO
 public class JobId {
 
+  final long jobId;
+
+  final JobType jobType;
+
+  public JobId(long jobId, JobType jobType) {
+    super();
+    this.jobId = jobId;
+    this.jobType = jobType;
+  }
+
+
 }

+ 295 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobImpl.java

@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.state.live.job;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.ambari.server.state.fsm.InvalidStateTransitonException;
+import org.apache.ambari.server.state.fsm.SingleArcTransition;
+import org.apache.ambari.server.state.fsm.StateMachine;
+import org.apache.ambari.server.state.fsm.StateMachineFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class JobImpl implements Job {
+
+  private static final Log LOG = LogFactory.getLog(JobImpl.class);
+
+  private final Lock readLock;
+  private final Lock writeLock;
+
+  private JobId id;
+
+  private long startTime;
+  private long lastUpdateTime;
+  private long completionTime;
+
+  // TODO
+  // need to add job failed reason
+
+  private static final StateMachineFactory
+    <JobImpl, JobState, JobEventType, JobEvent>
+      stateMachineFactory
+        = new StateMachineFactory<JobImpl, JobState,
+          JobEventType, JobEvent>
+            (JobState.INIT)
+
+    // define the state machine of a Job
+
+    .addTransition(JobState.INIT, JobState.IN_PROGRESS,
+        JobEventType.JOB_IN_PROGRESS, new JobProgressUpdateTransition())
+    .addTransition(JobState.IN_PROGRESS, JobState.IN_PROGRESS,
+        JobEventType.JOB_IN_PROGRESS, new JobProgressUpdateTransition())
+    .addTransition(JobState.IN_PROGRESS, JobState.COMPLETED,
+        JobEventType.JOB_COMPLETED, new JobCompletedTransition())
+    .addTransition(JobState.IN_PROGRESS, JobState.FAILED,
+        JobEventType.JOB_FAILED, new JobFailedTransition())
+    .addTransition(JobState.COMPLETED, JobState.INIT,
+        JobEventType.JOB_INIT, new NewJobTransition())
+    .addTransition(JobState.FAILED, JobState.INIT,
+        JobEventType.JOB_INIT, new NewJobTransition())
+    .installTopology();
+
+  private final StateMachine<JobState, JobEventType, JobEvent>
+      stateMachine;
+
+  public JobImpl(JobId id) {
+    super();
+    this.id = id;
+    this.stateMachine = stateMachineFactory.make(this);
+    ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    this.readLock = rwLock.readLock();
+    this.writeLock = rwLock.writeLock();
+    startTime = -1;
+    lastUpdateTime = -1;
+    completionTime = -1;
+  }
+
+  static class NewJobTransition
+     implements SingleArcTransition<JobImpl, JobEvent> {
+
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+      NewJobEvent e = (NewJobEvent) event;
+      // TODO audit logs
+      job.setId(e.getJobId());
+      job.setStartTime(e.getStartTime());
+      LOG.info("Launching a new Job"
+          + ", jobId=" + job.getId()
+          + ", startTime=" + job.getStartTime());
+    }
+  }
+
+  static class JobProgressUpdateTransition
+      implements SingleArcTransition<JobImpl, JobEvent> {
+
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+      JobProgressUpdateEvent e = (JobProgressUpdateEvent) event;
+      job.setLastUpdateTime(e.getProgressUpdateTime());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Progress update for Job"
+            + ", jobId=" + job.getId()
+            + ", startTime=" + job.getStartTime()
+            + ", lastUpdateTime=" + job.getLastUpdateTime());
+      }
+    }
+  }
+
+  static class JobCompletedTransition
+     implements SingleArcTransition<JobImpl, JobEvent> {
+
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+      // TODO audit logs
+      JobCompletedEvent e = (JobCompletedEvent) event;
+      job.setCompletionTime(e.getCompletionTime());
+      job.setLastUpdateTime(e.getCompletionTime());
+
+      LOG.info("Job completed successfully"
+          + ", jobId=" + job.getId()
+          + ", startTime=" + job.getStartTime()
+          + ", completionTime=" + job.getCompletionTime());
+    }
+  }
+
+  static class JobFailedTransition
+      implements SingleArcTransition<JobImpl, JobEvent> {
+
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+      // TODO audit logs
+      JobFailedEvent e = (JobFailedEvent) event;
+      job.setCompletionTime(e.getCompletionTime());
+      job.setLastUpdateTime(e.getCompletionTime());
+      LOG.info("Job failed to complete"
+          + ", jobId=" + job.getId()
+          + ", startTime=" + job.getStartTime()
+          + ", completionTime=" + job.getCompletionTime());
+    }
+  }
+
+
+  @Override
+  public JobState getState() {
+    try {
+      readLock.lock();
+      return stateMachine.getCurrentState();
+    }
+    finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public void setState(JobState state) {
+    try {
+      writeLock.lock();
+      stateMachine.setCurrentState(state);
+    }
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public void handleEvent(JobEvent event)
+      throws InvalidStateTransitonException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Handling Job event, eventType=" + event.getType().name()
+          + ", event=" + event.toString());
+    }
+    JobState oldState = getState();
+    try {
+      writeLock.lock();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle Job event at current state"
+            + ", jobId=" + this.getId()
+            + ", currentState=" + oldState
+            + ", eventType=" + event.getType()
+            + ", event=" + event);
+        throw e;
+      }
+    }
+    finally {
+      writeLock.unlock();
+    }
+    if (oldState != getState()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Job transitioned to a new state"
+            + ", jobId=" + this.getId()
+            + ", oldState=" + oldState
+            + ", currentState=" + getState()
+            + ", eventType=" + event.getType().name()
+            + ", event=" + event);
+      }
+    }
+  }
+
+  @Override
+  public JobId getId() {
+    try {
+      readLock.lock();
+      return id;
+    }
+    finally {
+      readLock.unlock();
+    }
+  }
+
+  private void setId(JobId id) {
+    try {
+      writeLock.lock();
+      this.id = id;
+    }
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public long getStartTime() {
+    try {
+      readLock.lock();
+      return startTime;
+    }
+    finally {
+      readLock.unlock();
+    }
+  }
+
+  public void setStartTime(long startTime) {
+    try {
+      writeLock.lock();
+      this.startTime = startTime;
+    }
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public long getLastUpdateTime() {
+    try {
+      readLock.lock();
+      return lastUpdateTime;
+    }
+    finally {
+      readLock.unlock();
+    }
+  }
+
+  public void setLastUpdateTime(long lastUpdateTime) {
+    try {
+      writeLock.lock();
+      this.lastUpdateTime = lastUpdateTime;
+    }
+    finally {
+      writeLock.unlock();
+    }
+
+  }
+
+  @Override
+  public long getCompletionTime() {
+    try {
+      readLock.lock();
+      return completionTime;
+    }
+    finally {
+      readLock.unlock();
+    }
+  }
+
+  public void setCompletionTime(long completionTime) {
+    try {
+      writeLock.lock();
+      this.completionTime = completionTime;
+    }
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+
+}

+ 19 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobProgressUpdateEvent.java

@@ -0,0 +1,19 @@
+package org.apache.ambari.server.state.live.job;
+
+public class JobProgressUpdateEvent extends JobEvent {
+
+  private final long progressUpdateTime;
+
+  public JobProgressUpdateEvent(JobId jobId, long progressUpdateTime) {
+    super(JobEventType.JOB_IN_PROGRESS, jobId);
+    this.progressUpdateTime = progressUpdateTime;
+  }
+
+  /**
+   * @return the progressUpdateTime
+   */
+  public long getProgressUpdateTime() {
+    return progressUpdateTime;
+  }
+
+}

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/live/JobState.java → ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobState.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.ambari.server.state.live;
+package org.apache.ambari.server.state.live.job;
 
 public enum JobState {
   /**

+ 7 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/live/job/JobType.java

@@ -0,0 +1,7 @@
+package org.apache.ambari.server.state.live.job;
+
+public class JobType {
+
+  public String jobName;
+
+}

+ 19 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/live/job/NewJobEvent.java

@@ -0,0 +1,19 @@
+package org.apache.ambari.server.state.live.job;
+
+public class NewJobEvent extends JobEvent {
+
+  private final long startTime;
+
+  public NewJobEvent(JobId jobId, long creationTime) {
+    super(JobEventType.JOB_INIT, jobId);
+    this.startTime = creationTime;
+  }
+
+  /**
+   * @return the start time of the Job
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+
+}

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/live/node/Node.java

@@ -24,7 +24,7 @@ import java.util.Map;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitonException;
 import org.apache.ambari.server.state.live.AgentVersion;
 import org.apache.ambari.server.state.live.DiskInfo;
-import org.apache.ambari.server.state.live.Job;
+import org.apache.ambari.server.state.live.job.Job;
 
 public interface Node {
 

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/live/node/NodeEvent.java

@@ -29,7 +29,7 @@ public class NodeEvent extends AbstractEvent<NodeEventType> {
    * Hostname of the Node
    */
   final String nodeName;
-  
+
   public NodeEvent(String nodeName, NodeEventType type) {
     super(type);
     this.nodeName = nodeName;

+ 6 - 6
ambari-server/src/main/java/org/apache/ambari/server/state/live/node/NodeHealthStatus.java

@@ -21,9 +21,9 @@ package org.apache.ambari.server.state.live.node;
 public class NodeHealthStatus {
 
   private HealthStatus healthStatus;
-  
+
   private String healthReport;
-  
+
   public NodeHealthStatus(HealthStatus healthStatus, String healthReport) {
     super();
     this.healthStatus = healthStatus;
@@ -33,11 +33,11 @@ public class NodeHealthStatus {
   public synchronized HealthStatus getHealthStatus() {
     return healthStatus;
   }
-  
+
   public synchronized void setHealthStatus(HealthStatus healthStatus) {
     this.healthStatus = healthStatus;
   }
-  
+
   public synchronized void setHealthReport(String healthReport) {
     this.healthReport = healthReport;
   }
@@ -45,11 +45,11 @@ public class NodeHealthStatus {
   public synchronized String getHealthReport() {
     return healthReport;
   }
-  
+
   public static enum HealthStatus {
     UNKNOWN,
     HEALTHY,
     UNHEALTHY
   }
-  
+
 }

+ 30 - 30
ambari-server/src/main/java/org/apache/ambari/server/state/live/node/NodeImpl.java

@@ -32,7 +32,7 @@ import org.apache.ambari.server.state.fsm.StateMachine;
 import org.apache.ambari.server.state.fsm.StateMachineFactory;
 import org.apache.ambari.server.state.live.AgentVersion;
 import org.apache.ambari.server.state.live.DiskInfo;
-import org.apache.ambari.server.state.live.Job;
+import org.apache.ambari.server.state.live.job.Job;
 import org.apache.ambari.server.state.live.node.NodeHealthStatus.HealthStatus;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -43,7 +43,7 @@ public class NodeImpl implements Node {
 
   private final Lock readLock;
   private final Lock writeLock;
-  
+
   /**
    * Node hostname
    */
@@ -58,17 +58,17 @@ public class NodeImpl implements Node {
    * Node IP if ipv6 interface available
    */
   private String ipv6;
-  
+
   /**
    * Count of cores on Node
    */
   private int cpuCount;
-  
+
   /**
    * Os Architecture
    */
   private String osArch;
-  
+
   /**
    * OS Type
    */
@@ -83,7 +83,7 @@ public class NodeImpl implements Node {
    * Amount of available memory for the Node
    */
   private long availableMemBytes;
-  
+
   /**
    * Amount of physical memory for the Node
    */
@@ -108,7 +108,7 @@ public class NodeImpl implements Node {
    * Rack to which the Node belongs to
    */
   private String rackInfo;
-  
+
   /**
    * Additional Node attributes
    */
@@ -118,12 +118,12 @@ public class NodeImpl implements Node {
    * Version of agent running on the Node
    */
   private AgentVersion agentVersion;
-  
+
   /**
    * Node Health Status
    */
   private NodeHealthStatus healthStatus;
-  
+
   private static final StateMachineFactory
     <NodeImpl, NodeState, NodeEventType, NodeEvent>
       stateMachineFactory
@@ -133,17 +133,17 @@ public class NodeImpl implements Node {
    // define the state machine of a Node
 
    // Transition from INIT state
-   // when the initial registration request is received        
+   // when the initial registration request is received
    .addTransition(NodeState.INIT, NodeState.WAITING_FOR_VERIFICATION,
        NodeEventType.NODE_REGISTRATION_REQUEST, new NodeRegistrationReceived())
 
    // Transition from WAITING_FOR_VERIFICATION state
-   // when the node is authenticated    
+   // when the node is authenticated
    .addTransition(NodeState.WAITING_FOR_VERIFICATION, NodeState.VERIFIED,
        NodeEventType.NODE_VERIFIED, new NodeVerifiedTransition())
 
    // Transitions from VERIFIED state
-   // when a normal heartbeat is received    
+   // when a normal heartbeat is received
    .addTransition(NodeState.VERIFIED, NodeState.HEALTHY,
        NodeEventType.NODE_HEARTBEAT_HEALTHY,
        new NodeBecameHealthyTransition())
@@ -151,13 +151,13 @@ public class NodeImpl implements Node {
    .addTransition(NodeState.VERIFIED, NodeState.HEARTBEAT_LOST,
        NodeEventType.NODE_HEARTBEAT_TIMED_OUT,
        new NodeHeartbeatTimedOutTransition())
-   // when a heartbeart denoting node as unhealthy is received    
+   // when a heartbeart denoting node as unhealthy is received
    .addTransition(NodeState.VERIFIED, NodeState.UNHEALTHY,
        NodeEventType.NODE_HEARTBEAT_UNHEALTHY,
-       new NodeBecameUnhealthyTransition())       
-       
+       new NodeBecameUnhealthyTransition())
+
    // Transitions from HEALTHY state
-   // when a normal heartbeat is received    
+   // when a normal heartbeat is received
    .addTransition(NodeState.HEALTHY, NodeState.HEALTHY,
        NodeEventType.NODE_HEARTBEAT_HEALTHY,
        new NodeHeartbeatReceivedTransition())
@@ -165,17 +165,17 @@ public class NodeImpl implements Node {
    .addTransition(NodeState.HEALTHY, NodeState.HEARTBEAT_LOST,
        NodeEventType.NODE_HEARTBEAT_TIMED_OUT,
        new NodeHeartbeatTimedOutTransition())
-   // when a heartbeart denoting node as unhealthy is received    
+   // when a heartbeart denoting node as unhealthy is received
    .addTransition(NodeState.HEALTHY, NodeState.UNHEALTHY,
        NodeEventType.NODE_HEARTBEAT_UNHEALTHY,
        new NodeBecameUnhealthyTransition())
 
    // Transitions from UNHEALTHY state
-   // when a normal heartbeat is received    
+   // when a normal heartbeat is received
    .addTransition(NodeState.UNHEALTHY, NodeState.HEALTHY,
        NodeEventType.NODE_HEARTBEAT_HEALTHY,
        new NodeBecameHealthyTransition())
-   // when a heartbeart denoting node as unhealthy is received    
+   // when a heartbeart denoting node as unhealthy is received
    .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
        NodeEventType.NODE_HEARTBEAT_UNHEALTHY,
        new NodeHeartbeatReceivedTransition())
@@ -185,11 +185,11 @@ public class NodeImpl implements Node {
        new NodeHeartbeatTimedOutTransition())
 
    // Transitions from HEARTBEAT_LOST state
-   // when a normal heartbeat is received    
+   // when a normal heartbeat is received
    .addTransition(NodeState.HEARTBEAT_LOST, NodeState.HEALTHY,
        NodeEventType.NODE_HEARTBEAT_HEALTHY,
        new NodeBecameHealthyTransition())
-   // when a heartbeart denoting node as unhealthy is received    
+   // when a heartbeart denoting node as unhealthy is received
    .addTransition(NodeState.HEARTBEAT_LOST, NodeState.UNHEALTHY,
        NodeEventType.NODE_HEARTBEAT_UNHEALTHY,
        new NodeBecameUnhealthyTransition())
@@ -220,7 +220,7 @@ public class NodeImpl implements Node {
       node.setAgentVersion(e.agentVersion);
     }
   }
-  
+
   static class NodeVerifiedTransition
       implements SingleArcTransition<NodeImpl, NodeEvent> {
 
@@ -229,7 +229,7 @@ public class NodeImpl implements Node {
       // TODO Auto-generated method stub
     }
   }
-  
+
   static class NodeHeartbeatReceivedTransition
     implements SingleArcTransition<NodeImpl, NodeEvent> {
 
@@ -247,12 +247,12 @@ public class NodeImpl implements Node {
           break;
       }
       if (0 == heartbeatTime) {
-        // TODO handle error        
+        // TODO handle error
       }
       node.setLastHeartbeatTime(heartbeatTime);
     }
-  }  
-  
+  }
+
   static class NodeBecameHealthyTransition
       implements SingleArcTransition<NodeImpl, NodeEvent> {
 
@@ -264,7 +264,7 @@ public class NodeImpl implements Node {
       LOG.info("Node transitioned to a healthy state"
           + ", node=" + e.nodeName
           + ", heartbeatTime=" + e.getHeartbeatTime());
-      node.getHealthStatus().setHealthStatus(HealthStatus.HEALTHY);      
+      node.getHealthStatus().setHealthStatus(HealthStatus.HEALTHY);
     }
   }
 
@@ -296,7 +296,7 @@ public class NodeImpl implements Node {
           + ", lastHeartbeatTime=" + node.getLastHeartbeatTime());
       node.getHealthStatus().setHealthStatus(HealthStatus.UNKNOWN);
     }
-  } 
+  }
 
   void importNodeInfo(NodeInfo nodeInfo) {
     try {
@@ -315,10 +315,10 @@ public class NodeImpl implements Node {
       this.hostAttributes = nodeInfo.hostAttributes;
     }
     finally {
-      writeLock.unlock();      
+      writeLock.unlock();
     }
   }
-   
+
   @Override
   public NodeState getState() {
     try {

+ 3 - 3
ambari-server/src/main/java/org/apache/ambari/server/state/live/node/NodeRegistrationRequestEvent.java

@@ -23,11 +23,11 @@ import org.apache.ambari.server.state.live.AgentVersion;
 public class NodeRegistrationRequestEvent extends NodeEvent {
 
   final long registrationTime;
-  
+
   final NodeInfo nodeInfo;
-  
+
   final AgentVersion agentVersion;
-  
+
   public NodeRegistrationRequestEvent(String nodeName,
       AgentVersion agentVersion, long registrationTime, NodeInfo nodeInfo) {
     super(nodeName, NodeEventType.NODE_REGISTRATION_REQUEST);

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/live/node/NodeUnhealthyHeartbeatEvent.java

@@ -21,7 +21,7 @@ package org.apache.ambari.server.state.live.node;
 public class NodeUnhealthyHeartbeatEvent extends NodeEvent {
 
   private final long heartbeatTime;
-  
+
   private final NodeHealthStatus healthStatus;
 
   public NodeUnhealthyHeartbeatEvent(String nodeName, long heartbeatTime,

+ 21 - 21
ambari-server/src/test/java/org/apache/ambari/server/state/live/node/TestNodeImpl.java

@@ -14,7 +14,7 @@ import org.apache.ambari.server.state.live.node.NodeHealthStatus.HealthStatus;
 
 public class TestNodeImpl {
 
-  
+
   @Test
   public void testNodeInfoImport() {
     NodeInfo info = new NodeInfo();
@@ -22,7 +22,7 @@ public class TestNodeImpl {
     info.cpuCount = 10;
     info.disksInfo = new ArrayList<DiskInfo>();
     info.disksInfo.add(new DiskInfo("/dev/sda", "ext3", "/mnt/disk1",
-        5000000, 4000000));    
+        5000000, 4000000));
     info.hostAttributes = new HashMap<String, String>();
     info.hostName = "foo";
     info.ipv4 = "fip_4";
@@ -31,10 +31,10 @@ public class TestNodeImpl {
     info.osInfo = "os_info";
     info.rackInfo = "/default-rack";
     info.totalMemBytes = 200;
-    
+
     NodeImpl node = new NodeImpl();
     node.importNodeInfo(info);
-    
+
     Assert.assertEquals(info.hostName, node.getHostName());
     Assert.assertEquals(info.ipv4, node.getIPv4());
     Assert.assertEquals(info.ipv6, node.getIPv6());
@@ -48,7 +48,7 @@ public class TestNodeImpl {
     Assert.assertEquals(info.osType, node.getOsType());
     Assert.assertEquals(info.osInfo, node.getOsInfo());
     Assert.assertEquals(info.rackInfo, node.getRackInfo());
-    
+
   }
 
   private void registerNode(NodeImpl node) throws Exception {
@@ -57,7 +57,7 @@ public class TestNodeImpl {
     info.cpuCount = 10;
     info.disksInfo = new ArrayList<DiskInfo>();
     info.disksInfo.add(new DiskInfo("/dev/sda", "ext3", "/mnt/disk1",
-        5000000, 4000000));    
+        5000000, 4000000));
     info.hostAttributes = new HashMap<String, String>();
     info.hostName = "foo";
     info.ipv4 = "fip_4";
@@ -68,8 +68,8 @@ public class TestNodeImpl {
     info.totalMemBytes = 200;
     AgentVersion agentVersion = null;
     long currentTime = System.currentTimeMillis();
-    
-    NodeRegistrationRequestEvent e = 
+
+    NodeRegistrationRequestEvent e =
         new NodeRegistrationRequestEvent("foo", agentVersion, currentTime,
             info);
     node.handleEvent(e);
@@ -78,13 +78,13 @@ public class TestNodeImpl {
 
   private void verifyNode(NodeImpl node) throws Exception {
     NodeVerifiedEvent e = new NodeVerifiedEvent(node.getHostName());
-    node.handleEvent(e);    
+    node.handleEvent(e);
   }
-  
+
   private void verifyNodeState(NodeImpl node, NodeState state) {
     Assert.assertEquals(node.getState(), state);
   }
-  
+
   private void sendHealthyHeartbeat(NodeImpl node, long counter) throws Exception {
     NodeHealthyHeartbeatEvent e = new NodeHealthyHeartbeatEvent(
         node.getHostName(), counter);
@@ -98,25 +98,25 @@ public class TestNodeImpl {
         node.getHostName(), counter, healthStatus);
     node.handleEvent(e);
   }
-  
+
   private void timeoutNode(NodeImpl node) throws Exception {
     NodeHeartbeatTimedOutEvent e = new NodeHeartbeatTimedOutEvent(
         node.getHostName());
     node.handleEvent(e);
   }
-  
+
   @Test
   public void testNodeFSMInit() {
     NodeImpl node = new NodeImpl();
     verifyNodeState(node, NodeState.INIT);
   }
-  
+
   @Test
   public void testNodeRegistrationFlow() throws Exception {
     NodeImpl node = new NodeImpl();
     registerNode(node);
     verifyNodeState(node, NodeState.WAITING_FOR_VERIFICATION);
-    
+
     boolean exceptionThrown = false;
     try {
       registerNode(node);
@@ -151,12 +151,12 @@ public class TestNodeImpl {
 
     // TODO need to verify audit logs generated
     // TODO need to verify health status updated properly
-    
+
     long counter = 0;
     sendHealthyHeartbeat(node, ++counter);
     verifyNodeState(node, NodeState.HEALTHY);
     Assert.assertEquals(node.getLastHeartbeatTime(), counter);
-    
+
     sendHealthyHeartbeat(node, ++counter);
     verifyNodeState(node, NodeState.HEALTHY);
     Assert.assertEquals(node.getLastHeartbeatTime(), counter);
@@ -180,7 +180,7 @@ public class TestNodeImpl {
     Assert.assertEquals(node.getLastHeartbeatTime(), counter);
     Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
         HealthStatus.HEALTHY);
-    
+
     timeoutNode(node);
     verifyNodeState(node, NodeState.HEARTBEAT_LOST);
     Assert.assertEquals(node.getLastHeartbeatTime(), counter);
@@ -198,7 +198,7 @@ public class TestNodeImpl {
     Assert.assertEquals(node.getLastHeartbeatTime(), counter);
     Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
         HealthStatus.UNHEALTHY);
-    
+
     timeoutNode(node);
     verifyNodeState(node, NodeState.HEARTBEAT_LOST);
     Assert.assertEquals(node.getLastHeartbeatTime(), counter);
@@ -207,9 +207,9 @@ public class TestNodeImpl {
 
     sendHealthyHeartbeat(node, ++counter);
     verifyNodeState(node, NodeState.HEALTHY);
-    Assert.assertEquals(node.getLastHeartbeatTime(), counter);    
+    Assert.assertEquals(node.getLastHeartbeatTime(), counter);
     Assert.assertEquals(node.getHealthStatus().getHealthStatus(),
         HealthStatus.HEALTHY);
-    
+
   }
 }