Browse Source

AMBARI-5761. 2000-node cluster testing: during install phase of cluster deployment, install tasks were stuck in PENDING state. (mpapirkovskyy)

Myroslav Papirkovskyy 11 years ago
parent
commit
949ecd21b4

+ 11 - 0
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java

@@ -19,6 +19,7 @@ package org.apache.ambari.server.actionmanager;
 
 import com.google.inject.persist.Transactional;
 import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.agent.ExecutionCommand;
 
 import java.util.Collection;
 import java.util.List;
@@ -110,6 +111,16 @@ public interface ActionDBAccessor {
    */
   public long getLastPersistedRequestIdWhenInitialized();
 
+  /**
+   * Bulk update scheduled commands
+   */
+  void bulkHostRoleScheduled(Stage s, List<ExecutionCommand> commands);
+
+  /**
+   * Bulk abort commands
+   */
+  void bulkAbortHostRole(Stage s, List<ExecutionCommand> commands);
+
   /**
    * Updates scheduled stage.
    */

+ 18 - 0
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java

@@ -24,6 +24,7 @@ import com.google.inject.Singleton;
 import com.google.inject.name.Named;
 import com.google.inject.persist.Transactional;
 import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
 import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
 import org.apache.ambari.server.orm.dao.HostDAO;
@@ -411,6 +412,23 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
     return requestId;
   }
 
+
+  @Override
+  @Transactional
+  public void bulkHostRoleScheduled(Stage s, List<ExecutionCommand> commands) {
+    for (ExecutionCommand command : commands) {
+      hostRoleScheduled(s, command.getHostname(), command.getRole());
+    }
+  }
+
+  @Override
+  @Transactional
+  public void bulkAbortHostRole(Stage s, List<ExecutionCommand> commands) {
+    for (ExecutionCommand command : commands) {
+      abortHostRole(command.getHostname(), s.getRequestId(), s.getStageId(), command.getRole());
+    }
+  }
+
   @Override
   @Transactional
   public void hostRoleScheduled(Stage s, String hostname, String roleStr) {

+ 119 - 12
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java

@@ -21,12 +21,20 @@ import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.reflect.TypeToken;
+import com.google.inject.persist.UnitOfWork;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
@@ -46,16 +54,16 @@ import org.apache.ambari.server.state.HostState;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceComponentHostEvent;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
 import org.apache.ambari.server.utils.StageUtils;
+import org.apache.commons.collections.MultiMap;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.reflect.TypeToken;
-import com.google.inject.persist.UnitOfWork;
+
 
 /**
  * This class encapsulates the action scheduler thread.
@@ -164,6 +172,7 @@ class ActionScheduler implements Runnable {
       List<Stage> stages = db.getStagesInProgress();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Scheduler wakes up");
+        LOG.debug("Processing {} in progress stages ", stages.size());
       }
       if (stages == null || stages.isEmpty()) {
         //Nothing to do
@@ -172,15 +181,18 @@ class ActionScheduler implements Runnable {
         }
         return;
       }
-
+      int i_stage = 0;
       for (Stage s : stages) {
         // Check if we can process this stage in parallel with another stages
+        i_stage ++;
 
         long requestId = s.getRequestId();
         // Convert to string to avoid glitches with boxing/unboxing
         String requestIdStr = String.valueOf(requestId);
+        LOG.debug("==> STAGE_i = " + i_stage + "(requestId=" + requestIdStr + ",StageId=" + s.getStageId() + ")");
         if (runningRequestIds.contains(requestIdStr)) {
           // We don't want to process different stages from the same request in parallel
+          LOG.info("==> We don't want to process different stages from the same request in parallel" );
           continue;
         } else {
           runningRequestIds.add(requestIdStr);
@@ -233,7 +245,11 @@ class ActionScheduler implements Runnable {
           return;
         }
 
+        List<ExecutionCommand> commandsToStart = new ArrayList<ExecutionCommand>();
+        List<ExecutionCommand> commandsToUpdate = new ArrayList<ExecutionCommand>();
+
         //Schedule what we have so far
+
         for (ExecutionCommand cmd : commandsToSchedule) {
           if (Role.valueOf(cmd.getRole()).equals(Role.AMBARI_SERVER_ACTION)) {
             /**
@@ -245,15 +261,46 @@ class ActionScheduler implements Runnable {
              */
             executeServerAction(s, cmd);
           } else {
-            try {
-              scheduleHostRole(s, cmd);
-            } catch (InvalidStateTransitionException e) {
-              LOG.warn("Could not schedule host role " + cmd.toString(), e);
-              db.abortHostRole(cmd.getHostname(), s.getRequestId(), s.getStageId(), cmd.getRole());
+            processHostRole(s, cmd, commandsToStart, commandsToUpdate);
+          }
+        }
+
+        LOG.debug("==> Commands to start: {}", commandsToStart.size());
+        LOG.debug("==> Commands to update: {}", commandsToUpdate.size());
+
+        //Multimap is analog of Map<Object, List<Object>> but allows to avoid nested loop
+        ListMultimap<String, ServiceComponentHostEvent> eventMap = formEventMap(s, commandsToStart);
+        LOG.debug("==> processing {} serviceComponentHostEvents...", eventMap.size());
+        List<ServiceComponentHostEvent> failedEvents =
+            fsmObject.getCluster(s.getClusterName()).processServiceComponentHostEvents(eventMap);
+        LOG.debug("==> {} events failed.", failedEvents.size());
+
+        List<ExecutionCommand> commandsToAbort = new ArrayList<ExecutionCommand>();
+
+        for (Iterator<ExecutionCommand> iterator = commandsToUpdate.iterator(); iterator.hasNext(); ) {
+          ExecutionCommand cmd = iterator.next();
+          for (ServiceComponentHostEvent event : failedEvents) {
+            if (StringUtils.equals(event.getHostName(), cmd.getHostname()) &&
+                StringUtils.equals(event.getServiceComponentName(), cmd.getRole())) {
+              iterator.remove();
+              commandsToAbort.add(cmd);
+              break;
             }
           }
         }
 
+        LOG.debug("==> Scheduling {} tasks...", commandsToUpdate.size());
+        db.bulkHostRoleScheduled(s, commandsToUpdate);
+
+        LOG.debug("==> Aborting {} tasks...", commandsToAbort.size());
+        db.bulkAbortHostRole(s, commandsToAbort);
+
+        LOG.debug("==> Adding {} tasks to queue...", commandsToUpdate.size());
+        for (ExecutionCommand cmd : commandsToUpdate) {
+          actionQueue.enqueue(cmd.getHostname(), cmd);
+        }
+        LOG.debug("==> Finished.");
+
         if (! configuration.getParallelStageExecution()) { // If disabled
           return;
         }
@@ -262,6 +309,7 @@ class ActionScheduler implements Runnable {
       requestsInProgress.retainAll(runningRequestIds);
 
     } finally {
+      LOG.debug("Scheduler finished work.");
       unitOfWork.end();
     }
   }
@@ -272,6 +320,8 @@ class ActionScheduler implements Runnable {
    */
   private void executeServerAction(Stage s, ExecutionCommand cmd) {
     try {
+      LOG.trace("Executing server action: request_id={}, stage_id={}, task_id={}",
+        s.getRequestId(), s.getStageId(), cmd.getTaskId());
       long now = System.currentTimeMillis();
       String hostName = cmd.getHostname();
       String roleName = cmd.getRole();
@@ -368,6 +418,7 @@ class ActionScheduler implements Runnable {
    */
   private Map<String, RoleStats> processInProgressStage(Stage s,
       List<ExecutionCommand> commandsToSchedule) throws AmbariException {
+    LOG.debug("==> Collecting commands to schedule...");
     // Map to track role status
     Map<String, RoleStats> roleStats = initRoleStats(s);
     long now = System.currentTimeMillis();
@@ -384,12 +435,17 @@ class ActionScheduler implements Runnable {
     for (String host : s.getHosts()) {
       List<ExecutionCommandWrapper> commandWrappers = s.getExecutionCommands(host);
       Host hostObj = fsmObject.getHost(host);
-
+      int i_my = 0;
+      LOG.trace("===>host=" + host);
       for(ExecutionCommandWrapper wrapper : commandWrappers) {
         ExecutionCommand c = wrapper.getExecutionCommand();
         String roleStr = c.getRole();
         HostRoleStatus status = s.getHostRoleStatus(host, roleStr);
-
+        i_my ++;
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Host task " + i_my + ") id = " + c.getTaskId() + " status = " + status.toString() +
+            " (role=" + roleStr + "), roleCommand = "+ c.getRoleCommand());
+        }
         boolean hostDeleted = false;
         if (null != cluster) {
           Service svc = null;
@@ -451,19 +507,23 @@ class ActionScheduler implements Runnable {
             }
 
             // Dequeue command
+            LOG.info("Removing command from queue, host={}, commandId={} ", host, c.getCommandId());
             actionQueue.dequeue(host, c.getCommandId());
           } else {
             // reschedule command
             commandsToSchedule.add(c);
+            LOG.trace("===> commandsToSchedule(reschedule)=" + commandsToSchedule.size());
           }
         } else if (status.equals(HostRoleStatus.PENDING)) {
           //Need to schedule first time
           commandsToSchedule.add(c);
+          LOG.trace("===>commandsToSchedule(first_time)=" + commandsToSchedule.size());
         }
 
         this.updateRoleStats(status, roleStats.get(roleStr));
       }
     }
+    LOG.debug("Collected {} commands to schedule in this wakeup.", commandsToSchedule.size());
     return roleStats;
   }
 
@@ -581,6 +641,53 @@ class ActionScheduler implements Runnable {
     return false;
   }
 
+  private ListMultimap<String, ServiceComponentHostEvent> formEventMap(Stage s, List<ExecutionCommand> commands) {
+    ListMultimap<String, ServiceComponentHostEvent> serviceEventMap = ArrayListMultimap.create();
+    for (ExecutionCommand cmd : commands) {
+      String hostname = cmd.getHostname();
+      String roleStr = cmd.getRole();
+      if (RoleCommand.ACTIONEXECUTE != cmd.getRoleCommand()) {
+          serviceEventMap.put(cmd.getServiceName(), s.getFsmEvent(hostname, roleStr).getEvent());
+      }
+    }
+    return serviceEventMap;
+  }
+
+  private void processHostRole(Stage s, ExecutionCommand cmd, List<ExecutionCommand> commandsToStart,
+                               List<ExecutionCommand> commandsToUpdate)
+    throws AmbariException {
+    long now = System.currentTimeMillis();
+    String roleStr = cmd.getRole();
+    String hostname = cmd.getHostname();
+
+    // start time is -1 if host role command is not started yet
+    if (s.getStartTime(hostname, roleStr) < 0) {
+
+      commandsToStart.add(cmd);
+      s.setStartTime(hostname,roleStr, now);
+      s.setHostRoleStatus(hostname, roleStr, HostRoleStatus.QUEUED);
+    }
+    s.setLastAttemptTime(hostname, roleStr, now);
+    s.incrementAttemptCount(hostname, roleStr);
+    /** change the hostname in the command for the host itself **/
+    cmd.setHostname(hostsMap.getHostMap(hostname));
+
+
+    //Try to get clusterHostInfo from cache
+    String stagePk = s.getStageId() + "-" + s.getRequestId();
+    Map<String, Set<String>> clusterHostInfo = clusterHostInfoCache.getIfPresent(stagePk);
+
+    if (clusterHostInfo == null) {
+      Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
+      clusterHostInfo = StageUtils.getGson().fromJson(s.getClusterHostInfo(), type);
+      clusterHostInfoCache.put(stagePk, clusterHostInfo);
+    }
+
+    cmd.setClusterHostInfo(clusterHostInfo);
+
+    commandsToUpdate.add(cmd);
+  }
+
   private void scheduleHostRole(Stage s, ExecutionCommand cmd)
       throws InvalidStateTransitionException, AmbariException {
     long now = System.currentTimeMillis();

+ 1 - 0
ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java

@@ -1814,6 +1814,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
     List<Stage> stages = doStageCreation(requestStages, cluster, changedServices, changedComponents,
         changedHosts, requestParameters, requestProperties,
         runSmokeTest, reconfigureClients);
+    LOG.debug("Created {} stages", ((stages != null) ? stages.size() : 0));
 
     requestStages.addStages(stages);
     updateServiceStates(changedServices, changedComponents, changedHosts, ignoredHosts);

+ 7 - 0
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java

@@ -318,6 +318,7 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
       List<Long> requestIds = actionManager.getRequestsByStatus(status,
         maxResults != null ? maxResults : BaseRequest.DEFAULT_PAGE_SIZE,
         ascOrder != null ? ascOrder : false);
+      LOG.debug("List<Long> requestIds = actionManager.getRequestsByStatus = {}", requestIds.size());
 
       response.addAll(getRequestResources(clusterName, actionManager, requestIds,
           requestedPropertyIds));
@@ -340,6 +341,7 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
                                                    Set<String> requestedPropertyIds) {
 
     List<org.apache.ambari.server.actionmanager.Request> requests = actionManager.getRequests(requestIds);
+    LOG.debug("requests = actionManager.getRequests(requestIds)={}", requests.size());
 
     Map<Long, Resource> resourceMap = new HashMap<Long, Resource>();
 
@@ -417,6 +419,11 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
 
     int inProgressTaskCount = taskCount - completedTaskCount - queuedTaskCount - pendingTaskCount;
 
+    LOG.debug("taskCount={}, inProgressTaskCount={}, completedTaskCount={}, queuedTaskCount={}, " +
+      "pendingTaskCount={}, failedTaskCount={}, abortedTaskCount={}, timedOutTaskCount={}",
+      taskCount, inProgressTaskCount, completedTaskCount, queuedTaskCount, pendingTaskCount,
+      failedTaskCount, abortedTaskCount, timedOutTaskCount);
+
     // determine request status
     HostRoleStatus requestStatus = failedTaskCount > 0 ? HostRoleStatus.FAILED :
         abortedTaskCount > 0 ? HostRoleStatus.ABORTED :

+ 11 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/Cluster.java

@@ -23,9 +23,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.locks.ReadWriteLock;
 
+import com.google.common.collect.ListMultimap;
+import com.google.inject.persist.Transactional;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.controller.ClusterResponse;
 import org.apache.ambari.server.state.configgroup.ConfigGroup;
+import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.apache.ambari.server.state.scheduler.RequestExecution;
 
 public interface Cluster {
@@ -293,4 +296,12 @@ public interface Cluster {
    * @throws AmbariException
    */
   public void deleteRequestExecution(Long id) throws AmbariException;
+
+  /**
+   * Bulk handle service component host events
+   *
+   * @param eventMap serviceName - event mapping
+   * @return list of failed events
+   */
+  List<ServiceComponentHostEvent> processServiceComponentHostEvents(ListMultimap<String, ServiceComponentHostEvent> eventMap);
 }

+ 33 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClusterImpl.java

@@ -36,6 +36,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.persistence.RollbackException;
 
+import com.google.common.collect.ListMultimap;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.ServiceComponentHostNotFoundException;
 import org.apache.ambari.server.ServiceNotFoundException;
@@ -65,12 +66,14 @@ import org.apache.ambari.server.state.MaintenanceState;
 import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceComponentHostEvent;
 import org.apache.ambari.server.state.ServiceFactory;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.State;
 import org.apache.ambari.server.state.ClusterHealthReport;
 import org.apache.ambari.server.state.configgroup.ConfigGroup;
 import org.apache.ambari.server.state.configgroup.ConfigGroupFactory;
+import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.apache.ambari.server.state.scheduler.RequestExecution;
 import org.apache.ambari.server.state.scheduler.RequestExecutionFactory;
 import org.slf4j.Logger;
@@ -1426,6 +1429,36 @@ public class ClusterImpl implements Cluster {
     return getHostsDesiredConfigs(hostnames);
   }
 
+  @Transactional
+  @Override
+  public List<ServiceComponentHostEvent> processServiceComponentHostEvents(ListMultimap<String, ServiceComponentHostEvent> eventMap) {
+    List<ServiceComponentHostEvent> failedEvents = new ArrayList<ServiceComponentHostEvent>();
+
+    clusterGlobalLock.readLock().lock();
+    try {
+      for (Entry<String, ServiceComponentHostEvent> entry : eventMap.entries()) {
+        String serviceName = entry.getKey();
+        ServiceComponentHostEvent event = entry.getValue();
+        try {
+          Service service = getService(serviceName);
+          ServiceComponent serviceComponent = service.getServiceComponent(event.getServiceComponentName());
+          ServiceComponentHost serviceComponentHost = serviceComponent.getServiceComponentHost(event.getHostName());
+          serviceComponentHost.handleEvent(event);
+        } catch (AmbariException e) {
+          LOG.error("ServiceComponentHost lookup exception ", e);
+          failedEvents.add(event);
+        } catch (InvalidStateTransitionException e) {
+          LOG.error("Invalid transition ", e);
+          failedEvents.add(event);
+        }
+      }
+    } finally {
+      clusterGlobalLock.readLock().unlock();
+    }
+
+    return failedEvents;
+  }
+
   private ClusterHealthReport getClusterHealthReport() throws AmbariException {
 
     int staleConfigsHosts = 0;