瀏覽代碼

AMBARI-5036. Secured: Start All Services task got stuck forever (dlysnichenko)

Lisnichenko Dmitro 11 年之前
父節點
當前提交
bb53d0df16

+ 6 - 0
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessor.java

@@ -95,6 +95,12 @@ public interface ActionDBAccessor {
    */
   public void abortHostRole(String host, long requestId, long stageId, String role);
 
+  /**
+   * Mark the task as to have been aborted. Reason should be specified manually.
+   */
+  public void abortHostRole(String host, long requestId, long stageId,
+                            String role, String reason);
+
   /**
    * Return the last persisted Request ID as seen when the DBAccessor object
    * was initialized.

+ 8 - 1
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java

@@ -388,9 +388,16 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
 
   @Override
   public void abortHostRole(String host, long requestId, long stageId, String role) {
+    String reason = "Host Role in invalid state";
+    abortHostRole(host, requestId, stageId, role, reason);
+  }
+
+  @Override
+  public void abortHostRole(String host, long requestId, long stageId,
+                            String role, String reason) {
     CommandReport report = new CommandReport();
     report.setExitCode(999);
-    report.setStdErr("Host Role in invalid state");
+    report.setStdErr(reason);
     report.setStdOut("");
     report.setStatus("ABORTED");
     updateHostRoleState(host, requestId, stageId, role, report);

+ 35 - 6
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java

@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.ServiceComponentHostNotFoundException;
 import org.apache.ambari.server.ServiceComponentNotFoundException;
 import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.CommandReport;
@@ -382,12 +383,36 @@ class ActionScheduler implements Runnable {
         ExecutionCommand c = wrapper.getExecutionCommand();
         String roleStr = c.getRole();
         HostRoleStatus status = s.getHostRoleStatus(host, roleStr);
-
-        // Process command timeouts
-        if (timeOutActionNeeded(status, s, hostObj, roleStr, now,
-          taskTimeout)) {
+        Service svc = cluster.getService(c.getServiceName());
+        ServiceComponent svcComp = svc.getServiceComponent(roleStr);
+
+        // Check that service host component is not deleted
+        Map<String, ServiceComponentHost> scHosts =
+                svcComp.getServiceComponentHosts();
+        if (! scHosts.containsKey(host)) {
+          String message = String.format(
+                  "Service component host not found when trying to " +
+                          "schedule an execution command. " +
+                          "The most probable reason " +
+                          "for that is that host component " +
+                          "has been deleted recently. "+
+                          "The command has been aborted and dequeued. " +
+                          "Execution command details: " +
+                          "cluster=%s; host=%s; service=%s; component=%s; " +
+                          "cmdId: %s; taskId: %s; roleCommand: %s",
+                  c.getClusterName(), host, svcComp.getServiceName(),
+                  svcComp.getName(),
+                  c.getCommandId(), c.getTaskId(), c.getRoleCommand());
+          LOG.warn(message);
+          // Abort the command itself
+          db.abortHostRole(host, s.getRequestId(),
+                  s.getStageId(), c.getRole(), message);
+          status = HostRoleStatus.ABORTED;
+        } else if (timeOutActionNeeded(status, s, hostObj, roleStr, now,
+                taskTimeout)) {
+          // Process command timeouts
           LOG.info("Host:" + host + ", role:" + roleStr + ", actionId:"
-              + s.getActionId() + " timed out");
+                  + s.getActionId() + " timed out");
           if (s.getAttemptCount(host, roleStr) >= maxAttempts) {
             LOG.warn("Host:" + host + ", role:" + roleStr + ", actionId:"
                 + s.getActionId() + " expired");
@@ -464,6 +489,10 @@ class ActionScheduler implements Runnable {
     } catch (ServiceComponentNotFoundException scnex) {
       LOG.debug(componentName + " associated with service " + serviceName +
         " is not a service component, assuming it's an action.");
+    } catch (ServiceComponentHostNotFoundException e) {
+      String msg = String.format("Service component host %s not found, " +
+              "unable to transition to failed state.", componentName);
+      LOG.warn(msg, e);
     } catch (InvalidStateTransitionException e) {
       if (ignoreTransitionException) {
         LOG.debug("Unable to transition to failed state.", e);
@@ -538,7 +567,7 @@ class ActionScheduler implements Runnable {
           Service svc = c.getService(cmd.getServiceName());
           ServiceComponent svcComp = svc.getServiceComponent(roleStr);
           ServiceComponentHost svcCompHost =
-              svcComp.getServiceComponentHost(hostname);
+                  svcComp.getServiceComponentHost(hostname);
           svcCompHost.handleEvent(s.getFsmEvent(hostname, roleStr).getEvent());
         } catch (ServiceComponentNotFoundException scnex) {
           LOG.debug("Not a service component, assuming its an action");

+ 0 - 1
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java

@@ -151,7 +151,6 @@ public class HeartBeatHandler {
     hostResponseIds.put(hostname, currentResponseId);
     hostResponses.put(hostname, response);
 
-    HostState hostState = hostObject.getState();
     // If the host is waiting for component status updates, notify it
     if (heartbeat.componentStatus.size() > 0
         && hostObject.getState().equals(HostState.WAITING_FOR_HOST_STATUS_UPDATES)) {

+ 184 - 16
ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java

@@ -32,11 +32,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import com.google.common.reflect.TypeToken;
 import com.google.inject.persist.UnitOfWork;
 import junit.framework.Assert;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
+import org.apache.ambari.server.ServiceComponentHostNotFoundException;
 import org.apache.ambari.server.actionmanager.ActionScheduler.RoleStats;
 import org.apache.ambari.server.agent.ActionQueue;
 import org.apache.ambari.server.agent.AgentCommand;
@@ -87,7 +91,7 @@ public class TestActionScheduler {
     
     Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
     Map<String, List<String>> clusterHostInfo = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
-    
+
     ActionQueue aq = new ActionQueue();
     Properties properties = new Properties();
     Configuration conf = new Configuration(properties);
@@ -104,6 +108,11 @@ public class TestActionScheduler {
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
     String hostname = "ahost.ambari.apache.org";
     Host host = mock(Host.class);
+    HashMap<String, ServiceComponentHost> hosts =
+            new HashMap<String, ServiceComponentHost>();
+    hosts.put(hostname, sch);
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
     when(fsm.getHost(anyString())).thenReturn(host);
     when(host.getState()).thenReturn(HostState.HEALTHY);
     when(host.getHostName()).thenReturn(hostname);
@@ -179,6 +188,11 @@ public class TestActionScheduler {
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
     String hostname = "ahost.ambari.apache.org";
     Host host = mock(Host.class);
+    HashMap<String, ServiceComponentHost> hosts =
+            new HashMap<String, ServiceComponentHost>();
+    hosts.put(hostname, sch);
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
     when(fsm.getHost(anyString())).thenReturn(host);
     when(host.getState()).thenReturn(HostState.HEALTHY);
     when(host.getHostName()).thenReturn(hostname);
@@ -239,6 +253,11 @@ public class TestActionScheduler {
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
     String hostname = "ahost.ambari.apache.org";
     Host host = mock(Host.class);
+    HashMap<String, ServiceComponentHost> hosts =
+            new HashMap<String, ServiceComponentHost>();
+    hosts.put(hostname, sch);
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
     when(fsm.getHost(anyString())).thenReturn(host);
     when(host.getState()).thenReturn(HostState.HEARTBEAT_LOST);
     when(host.getHostName()).thenReturn(hostname);
@@ -293,6 +312,12 @@ public class TestActionScheduler {
     String hostname2 = "host2";
     Host host1 = mock(Host.class);
     Host host2 = mock(Host.class);
+    HashMap<String, ServiceComponentHost> hosts =
+            new HashMap<String, ServiceComponentHost>();
+    hosts.put(hostname1, sch1);
+    hosts.put(hostname2, sch2);
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
     when(fsm.getHost(hostname1)).thenReturn(host1);
     when(fsm.getHost(hostname2)).thenReturn(host2);
     when(host1.getState()).thenReturn(HostState.HEARTBEAT_LOST);
@@ -434,6 +459,11 @@ public class TestActionScheduler {
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
 
     String hostname = "ahost.ambari.apache.org";
+    HashMap<String, ServiceComponentHost> hosts =
+            new HashMap<String, ServiceComponentHost>();
+    hosts.put(hostname, sch);
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
     List<Stage> stages = new ArrayList<Stage>();
     Map<String, String> payload = new HashMap<String, String>();
     payload.put(ServerAction.PayloadName.CLUSTER_NAME, "cluster1");
@@ -491,6 +521,11 @@ public class TestActionScheduler {
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
 
     String hostname = "ahost.ambari.apache.org";
+    HashMap<String, ServiceComponentHost> hosts =
+            new HashMap<String, ServiceComponentHost>();
+    hosts.put(hostname, sch);
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
     List<Stage> stages = new ArrayList<Stage>();
     Map<String, String> payload = new HashMap<String, String>();
     payload.put(ServerAction.PayloadName.CURRENT_STACK_VERSION, "HDP-0.2");
@@ -567,6 +602,14 @@ public class TestActionScheduler {
     String hostname2 = "bhost.ambari.apache.org";
     String hostname3 = "chost.ambari.apache.org";
     String hostname4 = "chost.ambari.apache.org";
+    HashMap<String, ServiceComponentHost> hosts =
+            new HashMap<String, ServiceComponentHost>();
+    hosts.put(hostname1, sch);
+    hosts.put(hostname2, sch);
+    hosts.put(hostname3, sch);
+    hosts.put(hostname4, sch);
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
     List<Stage> stages = new ArrayList<Stage>();
     stages.add(
             getStageWithSingleTask(
@@ -638,6 +681,14 @@ public class TestActionScheduler {
     String hostname2 = "bhost.ambari.apache.org";
     String hostname3 = "chost.ambari.apache.org";
     String hostname4 = "chost.ambari.apache.org";
+    HashMap<String, ServiceComponentHost> hosts =
+            new HashMap<String, ServiceComponentHost>();
+    hosts.put(hostname1, sch);
+    hosts.put(hostname2, sch);
+    hosts.put(hostname3, sch);
+    hosts.put(hostname4, sch);
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
     List<Stage> stages = new ArrayList<Stage>();
     stages.add(
             getStageWithSingleTask(
@@ -705,6 +756,11 @@ public class TestActionScheduler {
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
 
     String hostname = "ahost.ambari.apache.org";
+    HashMap<String, ServiceComponentHost> hosts =
+            new HashMap<String, ServiceComponentHost>();
+    hosts.put(hostname, sch);
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
     final List<Stage> stages = new ArrayList<Stage>();
     stages.add(
         getStageWithSingleTask(
@@ -822,10 +878,19 @@ public class TestActionScheduler {
     when(serviceObj.getServiceComponent(anyString())).thenReturn(scomp);
     when(scomp.getServiceComponentHost(anyString())).thenReturn(sch);
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+    String host1 = "host1";
+    String host2 = "host2";
     Host host = mock(Host.class);
+    HashMap<String, ServiceComponentHost> hosts =
+            new HashMap<String, ServiceComponentHost>();
+    hosts.put(host1, sch);
+    hosts.put(host2, sch);
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
     when(fsm.getHost(anyString())).thenReturn(host);
     when(host.getState()).thenReturn(HostState.HEALTHY);
-    when(host.getHostName()).thenReturn("host1");
+    when(host.getHostName()).thenReturn(host1);
 
 
     final List<Stage> stages = new ArrayList<Stage>();
@@ -835,25 +900,25 @@ public class TestActionScheduler {
     stage.setStageId(1);
 
     addHostRoleExecutionCommand(now, stage, Role.SQOOP, Service.Type.SQOOP,
-        RoleCommand.INSTALL, "host1", "cluster1");
+        RoleCommand.INSTALL, host1, "cluster1");
 
     addHostRoleExecutionCommand(now, stage, Role.OOZIE_CLIENT, Service.Type.OOZIE,
-        RoleCommand.INSTALL, "host1", "cluster1");
+        RoleCommand.INSTALL, host1, "cluster1");
 
     addHostRoleExecutionCommand(now, stage, Role.MAPREDUCE_CLIENT, Service.Type.MAPREDUCE,
-        RoleCommand.INSTALL, "host1", "cluster1");
+        RoleCommand.INSTALL, host1, "cluster1");
 
     addHostRoleExecutionCommand(now, stage, Role.HBASE_CLIENT, Service.Type.HBASE,
-        RoleCommand.INSTALL, "host1", "cluster1");
+        RoleCommand.INSTALL, host1, "cluster1");
 
     addHostRoleExecutionCommand(now, stage, Role.GANGLIA_MONITOR, Service.Type.GANGLIA,
-        RoleCommand.INSTALL, "host1", "cluster1");
+        RoleCommand.INSTALL, host1, "cluster1");
 
     addHostRoleExecutionCommand(now, stage, Role.HBASE_CLIENT, Service.Type.HBASE,
-        RoleCommand.INSTALL, "host2", "cluster1");
+        RoleCommand.INSTALL, host2, "cluster1");
 
     addHostRoleExecutionCommand(now, stage, Role.GANGLIA_MONITOR, Service.Type.GANGLIA,
-        RoleCommand.INSTALL, "host2", "cluster1");
+        RoleCommand.INSTALL, host2, "cluster1");
 
     stages.add(stage);
 
@@ -865,13 +930,13 @@ public class TestActionScheduler {
       stage.getOrderedHostRoleCommands().get(index).setStatus(statusesAtIterOne[index]);
     }
 
-    stage.setLastAttemptTime("host1", Role.SQOOP.toString(), now);
-    stage.setLastAttemptTime("host1", Role.MAPREDUCE_CLIENT.toString(), now);
-    stage.setLastAttemptTime("host1", Role.OOZIE_CLIENT.toString(), now);
-    stage.setLastAttemptTime("host1", Role.GANGLIA_MONITOR.toString(), now);
-    stage.setLastAttemptTime("host1", Role.HBASE_CLIENT.toString(), now);
-    stage.setLastAttemptTime("host2", Role.GANGLIA_MONITOR.toString(), now);
-    stage.setLastAttemptTime("host2", Role.HBASE_CLIENT.toString(), now);
+    stage.setLastAttemptTime(host1, Role.SQOOP.toString(), now);
+    stage.setLastAttemptTime(host1, Role.MAPREDUCE_CLIENT.toString(), now);
+    stage.setLastAttemptTime(host1, Role.OOZIE_CLIENT.toString(), now);
+    stage.setLastAttemptTime(host1, Role.GANGLIA_MONITOR.toString(), now);
+    stage.setLastAttemptTime(host1, Role.HBASE_CLIENT.toString(), now);
+    stage.setLastAttemptTime(host2, Role.GANGLIA_MONITOR.toString(), now);
+    stage.setLastAttemptTime(host2, Role.HBASE_CLIENT.toString(), now);
 
     ActionDBAccessor db = mock(ActionDBAccessor.class);
 
@@ -1246,6 +1311,11 @@ public class TestActionScheduler {
     when(serviceObj.getCluster()).thenReturn(oneClusterMock);
     String hostname = "ahost.ambari.apache.org";
     Host host = mock(Host.class);
+    HashMap<String, ServiceComponentHost> hosts =
+            new HashMap<String, ServiceComponentHost>();
+    hosts.put(hostname, sch);
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
     when(fsm.getHost(anyString())).thenReturn(host);
     when(host.getState()).thenReturn(HostState.HEALTHY);
     when(host.getHostName()).thenReturn(hostname);
@@ -1281,4 +1351,102 @@ public class TestActionScheduler {
     assertEquals(clusterHostInfo2, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo());
 
   }
+
+
+  /**
+   * Checks what happens when stage has an execution command for
+   * host component that has been recently deleted
+   * @throws Exception
+   */
+  @Test
+  public void testCommandAbortForDeletedComponent() throws Exception {
+    ActionQueue aq = new ActionQueue();
+    Properties properties = new Properties();
+    Configuration conf = new Configuration(properties);
+    Clusters fsm = mock(Clusters.class);
+    Cluster oneClusterMock = mock(Cluster.class);
+    Service serviceObj = mock(Service.class);
+    ServiceComponent scomp = mock(ServiceComponent.class);
+    ServiceComponent scWithDeletedSCH = mock(ServiceComponent.class);
+    ServiceComponentHost sch1 = mock(ServiceComponentHost.class);
+    String hostname1 = "host1";
+    Host host1 = mock(Host.class);
+    when(fsm.getHost(hostname1)).thenReturn(host1);
+    when(host1.getState()).thenReturn(HostState.HEALTHY);
+    when(host1.getHostName()).thenReturn(hostname1);
+    when(scomp.getServiceComponentHost(hostname1)).thenReturn(sch1);
+    HashMap<String, ServiceComponentHost> hosts =
+            new HashMap<String, ServiceComponentHost>();
+    hosts.put(hostname1, sch1);
+    when(scomp.getServiceComponentHosts()).thenReturn(hosts);
+
+    UnitOfWork unitOfWork = mock(UnitOfWork.class);
+    when(fsm.getCluster(anyString())).thenReturn(oneClusterMock);
+    when(oneClusterMock.getService(anyString())).thenReturn(serviceObj);
+    when(serviceObj.getServiceComponent(Role.HBASE_MASTER.toString())).
+            thenReturn(scWithDeletedSCH);
+    when(serviceObj.getServiceComponent(Role.HBASE_REGIONSERVER.toString())).
+            thenReturn(scomp);
+    when(scWithDeletedSCH.getServiceComponentHost(anyString())).
+            thenThrow(new ServiceComponentHostNotFoundException("dummyCluster",
+                "dummyService", "dummyComponent", "dummyHostname"));
+    when(serviceObj.getCluster()).thenReturn(oneClusterMock);
+
+    final List<Stage> stages = new ArrayList<Stage>();
+    Stage stage1 = new Stage(1, "/tmp", "cluster1", "stageWith2Tasks",
+            CLUSTER_HOST_INFO);
+    addInstallTaskToStage(stage1, hostname1, "cluster1", Role.HBASE_MASTER,
+            RoleCommand.INSTALL, Service.Type.HBASE, 1);
+    addInstallTaskToStage(stage1, hostname1, "cluster1", Role.HBASE_REGIONSERVER,
+            RoleCommand.INSTALL, Service.Type.HBASE, 2);
+    stages.add(stage1);
+
+    ActionDBAccessor db = mock(ActionDBAccessor.class);
+
+    when(db.getStagesInProgress()).thenReturn(stages);
+
+    ActionScheduler scheduler = new ActionScheduler(100, 50000, db, aq, fsm, 3,
+            new HostsMap((String) null), null, unitOfWork, conf);
+
+    final CountDownLatch abortCalls = new CountDownLatch(2);
+
+    doAnswer(new Answer<Object>() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+      Long requestId = (Long) invocation.getArguments()[0];
+      for (Stage stage : stages) {
+        if (requestId.equals(stage.getRequestId())) {
+          for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
+            if (command.getStatus() == HostRoleStatus.QUEUED ||
+                    command.getStatus() == HostRoleStatus.IN_PROGRESS ||
+                    command.getStatus() == HostRoleStatus.PENDING) {
+              command.setStatus(HostRoleStatus.ABORTED);
+            }
+          }
+        }
+      }
+      abortCalls.countDown();
+      return null;
+      }
+    }).when(db).abortOperation(anyLong());
+
+    scheduler.setTaskTimeoutAdjustment(false);
+    // Start the thread
+    scheduler.start();
+
+    long timeout = 60;
+    abortCalls.await(timeout, TimeUnit.SECONDS);
+
+    Assert.assertEquals(HostRoleStatus.ABORTED,
+            stages.get(0).getHostRoleStatus(hostname1, "HBASE_MASTER"));
+    Assert.assertEquals(HostRoleStatus.ABORTED,
+            stages.get(0).getHostRoleStatus(hostname1, "HBASE_REGIONSERVER"));
+
+    // If regression occured, scheduler thread would fail with an exception
+    // instead of aborting request
+    verify(db, times(2)).abortOperation(anyLong());
+
+    scheduler.stop();
+  }
+
 }