Parcourir la source

AMBARI-904. Ensure state changes only happen after actionmanager persists actions (Contributed by Hitesh)

git-svn-id: https://svn.apache.org/repos/asf/incubator/ambari/branches/AMBARI-666@1401932 13f79535-47bb-0310-9956-ffa450edef68
Hitesh Shah il y a 12 ans
Parent
commit
6c3c0c3a1a

+ 3 - 0
AMBARI-666-CHANGES.txt

@@ -354,6 +354,9 @@ AMBARI-666 branch (unreleased changes)
 
   BUG FIXES
 
+  AMBARI-904. Ensure state changes only happen after actionmanager persists
+  actions. (hitesh)
+
   AMBARI-905. Fix puppet site creation with flattening of execution commands
   send from the server. (mahadev)
 

+ 12 - 9
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java

@@ -93,8 +93,11 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
    */
   @Override
   public void abortOperation(long requestId) {
-    Collection<HostRoleStatus> sourceStatuses = Arrays.asList(HostRoleStatus.QUEUED, HostRoleStatus.IN_PROGRESS, HostRoleStatus.PENDING);
-    int result = hostRoleCommandDAO.updateStatusByRequestId(requestId, HostRoleStatus.ABORTED, sourceStatuses);
+    Collection<HostRoleStatus> sourceStatuses =
+        Arrays.asList(HostRoleStatus.QUEUED, HostRoleStatus.IN_PROGRESS,
+            HostRoleStatus.PENDING);
+    int result = hostRoleCommandDAO.updateStatusByRequestId(requestId,
+        HostRoleStatus.ABORTED, sourceStatuses);
     LOG.info("Aborted {} commands", result);
   }
 
@@ -103,8 +106,10 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
    */
   @Override
   @Transactional
-  public void timeoutHostRole(String host, long requestId, long stageId, Role role) {
-    List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findByHostRole(host, requestId, stageId, role);
+  public void timeoutHostRole(String host, long requestId, long stageId,
+      Role role) {
+    List<HostRoleCommandEntity> commands =
+        hostRoleCommandDAO.findByHostRole(host, requestId, stageId, role);
     for (HostRoleCommandEntity command : commands) {
       command.setStatus(HostRoleStatus.TIMEDOUT);
       hostRoleCommandDAO.merge(command);
@@ -117,10 +122,9 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
   @Override
   public List<Stage> getStagesInProgress() {
     List<Stage> stages = new ArrayList<Stage>();
-    List<HostRoleStatus> statuses = new ArrayList<HostRoleStatus>();
-    statuses.add(HostRoleStatus.PENDING);
-    statuses.add(HostRoleStatus.QUEUED);
-    statuses.add(HostRoleStatus.IN_PROGRESS);
+    List<HostRoleStatus> statuses =
+        Arrays.asList(HostRoleStatus.QUEUED, HostRoleStatus.IN_PROGRESS,
+            HostRoleStatus.PENDING);
     for (StageEntity stageEntity : stageDAO.findByCommandStatuses(statuses)) {
       stages.add(stageFactory.createExisting(stageEntity));
     }
@@ -136,7 +140,6 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Adding stages to DB, stageCount=" + stages.size());
     }
-
     for (Stage stage : stages) {
       StageEntity stageEntity = stage.constructNewPersistenceEntity();
       Cluster cluster;

+ 2 - 1
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java

@@ -82,7 +82,8 @@ public class HeartBeatHandler {
     LOG.info("Heartbeat received from host " + heartbeat.getHostname()
         + " responseId=" + heartbeat.getResponseId());
     Host hostObject = clusterFsm.getHost(hostname);
-
+    // FIXME need to remove this hack
+    hostObject.refresh();
     long now = System.currentTimeMillis();
     hostObject.refresh();
 

+ 115 - 63
ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java

@@ -1173,8 +1173,6 @@ public class AmbariManagementControllerImpl implements
       for (State newState : changedScHosts.get(compName).keySet()) {
         for (ServiceComponentHost scHost :
             changedScHosts.get(compName).get(newState)) {
-          // FIXME file configs with appropriate configs for this
-          // scHost instance if action is install or start
           RoleCommand roleCommand;
           State oldSchState = scHost.getDesiredState();
           ServiceComponentHostEvent event;
@@ -1242,6 +1240,18 @@ public class AmbariManagementControllerImpl implements
       }
     }
 
+    RoleGraph rg = new RoleGraph(rco);
+    rg.build(stage);
+    List<Stage> stages = rg.getStages();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Triggering Action Manager"
+          + ", clusterName=" + cluster.getClusterName()
+          + ", requestId=" + requestId
+          + ", stagesCount=" + stages.size());
+    }
+    actionManager.sendActions(stages);
+
     if (changedServices != null) {
       for (Entry<State, List<Service>> entry : changedServices.entrySet()) {
         State newState = entry.getKey();
@@ -1271,23 +1281,7 @@ public class AmbariManagementControllerImpl implements
       }
     }
 
-    RoleGraph rg = new RoleGraph(rco);
-    rg.build(stage);
-    List<Stage> stages = rg.getStages();
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Triggering Action Manager"
-          + ", clusterName=" + cluster.getClusterName()
-          + ", requestId=" + requestId
-          + ", stagesCount=" + stages.size());
-      for (Stage stagetmp: stages) {
-        LOG.debug("Stage tmp dump: " + stagetmp.toString());
-      }
-    }
-    actionManager.sendActions(stages);
-
     return new TrackActionResponse(requestId);
-
   }
 
   private boolean isValidTransition(State oldState, State newState) {
@@ -1730,29 +1724,41 @@ public class AmbariManagementControllerImpl implements
       Service s = cluster.getService(request.getServiceName());
       ServiceComponent sc = s.getServiceComponent(
         request.getComponentName());
+      State oldState = sc.getDesiredState();
+      State newState = null;
+      if (request.getDesiredState() != null) {
+        newState = State.valueOf(request.getDesiredState());
+        if (!newState.isValidDesiredState()) {
+          // FIXME fix with appropriate exception
+          throw new AmbariException("Invalid desired state");
+        }
+      }
 
       if (request.getConfigVersions() != null) {
-        Map<String, Config> updated = new HashMap<String, Config>();
-
-        for (Entry<String,String> entry : request.getConfigVersions().entrySet()) {
-          Config config = cluster.getDesiredConfig(entry.getKey(), entry.getValue());
-          if (null != config) {
-            updated.put(config.getType(), config);
-          }
+        // validate whether changing configs is allowed
+        // FIXME this will need to change for cascading updates
+        // need to check all components and hostcomponents for correct state
+        safeToUpdateConfigsForServiceComponent(sc, oldState, newState);
 
-          if (0 != updated.size()) {
-            sc.updateDesiredConfigs(updated);
-            sc.persist();
+        for (Entry<String,String> entry :
+            request.getConfigVersions().entrySet()) {
+          Config config = cluster.getDesiredConfig(
+              entry.getKey(), entry.getValue());
+          if (null == config) {
+            // throw error for invalid config
+            throw new AmbariException("Trying to update servicecomponent with"
+                + " invalid configs"
+                + ", clusterName=" + cluster.getClusterName()
+                + ", clusterId=" + cluster.getClusterId()
+                + ", serviceName=" + s.getName()
+                + ", componentName=" + sc.getName()
+                + ", invalidConfigType=" + entry.getKey()
+                + ", invalidConfigTag=" + entry.getValue());
           }
         }
-
-        // TODO handle config updates
-        // handle recursive updates to all components and hostcomponents
-        // if different from old desired configs, trigger relevant actions
       }
 
-      if (request.getDesiredState() == null
-          || request.getDesiredState().isEmpty()) {
+      if (newState == null) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Nothing to do for new updateServiceComponent request"
               + ", clusterName=" + request.getClusterName()
@@ -1763,12 +1769,6 @@ public class AmbariManagementControllerImpl implements
         continue;
       }
 
-      State newState = State.valueOf(request.getDesiredState());
-      if (!newState.isValidDesiredState()) {
-        // TODO fix
-        throw new AmbariException("Invalid desired state");
-      }
-
       if (sc.isClientComponent() &&
           !newState.isValidClientComponentState()) {
         throw new AmbariException("Invalid desired state for a client"
@@ -1864,6 +1864,29 @@ public class AmbariManagementControllerImpl implements
     // TODO if all components reach a common state, should service state be
     // modified?
 
+    for (ServiceComponentRequest request : requests) {
+      Cluster cluster = clusters.getCluster(request.getClusterName());
+      Service s = cluster.getService(request.getServiceName());
+      ServiceComponent sc = s.getServiceComponent(
+        request.getComponentName());
+      if (request.getConfigVersions() != null) {
+        Map<String, Config> updated = new HashMap<String, Config>();
+
+        for (Entry<String,String> entry :
+          request.getConfigVersions().entrySet()) {
+          Config config = cluster.getDesiredConfig(
+              entry.getKey(), entry.getValue());
+          updated.put(config.getType(), config);
+          if (!updated.isEmpty()) {
+            sc.updateDesiredConfigs(updated);
+            sc.persist();
+          }
+        }
+
+        // TODO handle recursive updates to all hostcomponents
+      }
+    }
+
     Cluster cluster = clusters.getCluster(clusterNames.iterator().next());
 
     return doStageCreation(cluster, null,
@@ -1987,29 +2010,42 @@ public class AmbariManagementControllerImpl implements
         request.getComponentName());
       ServiceComponentHost sch = sc.getServiceComponentHost(
         request.getHostname());
+      State oldState = sch.getDesiredState();
+      State newState = null;
+      if (request.getDesiredState() != null) {
+        newState = State.valueOf(request.getDesiredState());
+        if (!newState.isValidDesiredState()) {
+          // FIXME fix with appropriate exception
+          throw new AmbariException("Invalid desired state");
+        }
+      }
 
       if (request.getConfigVersions() != null) {
-        Map<String, Config> updated = new HashMap<String, Config>();
-
-        for (Entry<String,String> entry : request.getConfigVersions().entrySet()) {
-          Config config = cluster.getDesiredConfig(entry.getKey(), entry.getValue());
-          if (null != config) {
-            updated.put(config.getType(), config);
-          }
+        // validate whether changing configs is allowed
+        // FIXME this will need to change for cascading updates
+        // need to check all components and hostcomponents for correct state
+        safeToUpdateConfigsForServiceComponentHost(sch, oldState, newState);
 
-          if (0 != updated.size()) {
-            sch.updateDesiredConfigs(updated);
-            sch.persist();
+        for (Entry<String,String> entry :
+            request.getConfigVersions().entrySet()) {
+          Config config = cluster.getDesiredConfig(
+              entry.getKey(), entry.getValue());
+          if (null == config) {
+            // throw error for invalid config
+            throw new AmbariException("Trying to update servicecomponenthost"
+                + " with invalid configs"
+                + ", clusterName=" + cluster.getClusterName()
+                + ", clusterId=" + cluster.getClusterId()
+                + ", serviceName=" + s.getName()
+                + ", componentName=" + sc.getName()
+                + ", hostname=" + sch.getHostName()
+                + ", invalidConfigType=" + entry.getKey()
+                + ", invalidConfigTag=" + entry.getValue());
           }
         }
-
-        // TODO handle config updates
-        // handle recursive updates to all components and hostcomponents
-        // if different from old desired configs, trigger relevant actions
       }
 
-      if (request.getDesiredState() == null
-          || request.getDesiredState().isEmpty()) {
+      if (newState == null) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Nothing to do for new updateServiceComponentHost request"
               + ", clusterName=" + request.getClusterName()
@@ -2021,12 +2057,6 @@ public class AmbariManagementControllerImpl implements
         continue;
       }
 
-      State newState = State.valueOf(request.getDesiredState());
-      if (!newState.isValidDesiredState()) {
-        // TODO fix
-        throw new AmbariException("Invalid desired state");
-      }
-
       if (sc.isClientComponent() &&
           !newState.isValidClientComponentState()) {
         throw new AmbariException("Invalid desired state for a client"
@@ -2086,11 +2116,33 @@ public class AmbariManagementControllerImpl implements
           + " for a set of service components at the same time");
     }
 
-    // TODO fix
+    // TODO fix live state handling
     // currently everything is being done based on desired state
     // at some point do we need to do stuff based on live state?
 
     // TODO additional validation?
+    for (ServiceComponentHostRequest request : requests) {
+      Cluster cluster = clusters.getCluster(request.getClusterName());
+      Service s = cluster.getService(request.getServiceName());
+      ServiceComponent sc = s.getServiceComponent(
+        request.getComponentName());
+      ServiceComponentHost sch = sc.getServiceComponentHost(
+        request.getHostname());
+      if (request.getConfigVersions() != null) {
+        Map<String, Config> updated = new HashMap<String, Config>();
+
+        for (Entry<String,String> entry : request.getConfigVersions().entrySet()) {
+          Config config = cluster.getDesiredConfig(
+              entry.getKey(), entry.getValue());
+          updated.put(config.getType(), config);
+
+          if (!updated.isEmpty()) {
+            sch.updateDesiredConfigs(updated);
+            sch.persist();
+          }
+        }
+      }
+    }
 
     Cluster cluster = clusters.getCluster(clusterNames.iterator().next());
 

+ 7 - 5
ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java

@@ -218,7 +218,8 @@ public class HostImpl implements Host {
       LOG.debug("Host transition to host status updates received state"
           + ", host=" + e.getHostName()
           + ", heartbeatTime=" + e.getTimestamp());
-      host.setHealthStatus(new HostHealthStatus(HealthStatus.HEALTHY, host.getHealthStatus().getHealthReport()));
+      host.setHealthStatus(new HostHealthStatus(HealthStatus.HEALTHY,
+          host.getHealthStatus().getHealthReport()));
     }
   }
 
@@ -296,10 +297,11 @@ public class HostImpl implements Host {
   public void importHostInfo(HostInfo hostInfo) {
     try {
       writeLock.lock();
-      if (hostInfo.getFQDN() != null
-          && !hostInfo.getFQDN().isEmpty()
-          && !hostInfo.getFQDN().equals(getHostName())) {
-        setHostName(hostInfo.getFQDN());
+      String fqdn = hostInfo.getFQDN();
+      if (fqdn != null
+          && !fqdn.isEmpty()
+          && !fqdn.equals(getHostName())) {
+        setHostName(hostInfo.getHostName());
       }
       if (hostInfo.getIPAddress() != null
           && !hostInfo.getIPAddress().isEmpty()) {

+ 1 - 0
ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java

@@ -1192,6 +1192,7 @@ public class AmbariManagementControllerTest {
     configs.put(c1.getType(), c1);
     configs.put(c2.getType(), c2);
     s1.updateDesiredConfigs(configs);
+    s1.persist();
 
     ServiceRequest r1 = new ServiceRequest(clusterName, serviceName, null,
         State.INSTALLED.toString());