Browse Source

AMBARI-17374. Ambari reports "IN PROGRESS" status for a finished install task. (mpapirkovskyy)

Myroslav Papirkovskyi 9 năm trước cách đây
mục cha
commit
741c1eab84

+ 7 - 2
ambari-agent/src/main/python/ambari_agent/ActionQueue.py

@@ -26,6 +26,7 @@ import pprint
 import os
 import ambari_simplejson as json
 import time
+import signal
 
 from AgentException import AgentException
 from LiveStatus import LiveStatus
@@ -141,7 +142,7 @@ class ActionQueue(threading.Thread):
           logger.info("Canceling " + queued_command['commandType'] + \
                       " for service " + queued_command['serviceName'] + \
                       " and role " +  queued_command['role'] + \
-                      " with taskId " + queued_command['taskId'])
+                      " with taskId " + str(queued_command['taskId']))
 
       # Kill if in progress
       self.customServiceOrchestrator.cancel_command(task_id, reason)
@@ -313,7 +314,11 @@ class ActionQueue(threading.Thread):
         if commandresult['exitcode'] == 0:
           status = self.COMPLETED_STATUS
         else:
-          status = self.FAILED_STATUS
+          if (commandresult['exitcode'] == -signal.SIGTERM) or (commandresult['exitcode'] == -signal.SIGKILL):
+            logger.info('Command {cid} was canceled!'.format(cid=taskId))
+            return
+          else:
+            status = self.FAILED_STATUS
 
       if status != self.COMPLETED_STATUS and retryAble and retryDuration > 0:
         delay = self.get_retry_delay(delay)

+ 4 - 1
ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py

@@ -244,7 +244,10 @@ class CustomServiceOrchestrator():
         logger.debug('Pop with taskId %s' % task_id)
         pid = self.commands_in_progress.pop(task_id)
         if not isinstance(pid, int):
-          return '\nCommand aborted. ' + pid
+          if pid:
+            return '\nCommand aborted. ' + pid
+          else:
+            return ''
     return None
 
   def requestComponentStatus(self, command):

+ 7 - 0
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionManager.java

@@ -20,6 +20,7 @@ package org.apache.ambari.server.actionmanager;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -141,6 +142,12 @@ public class ActionManager {
       return;
     }
 
+    Collections.sort(reports, new Comparator<CommandReport>() {
+      @Override
+      public int compare(CommandReport o1, CommandReport o2) {
+        return (int) (o1.getTaskId()-o2.getTaskId());
+      }
+    });
     List<CommandReport> reportsToProcess = new ArrayList<CommandReport>();
     Iterator<HostRoleCommand> commandIterator = commands.iterator();
     //persist the action response into the db.

+ 13 - 0
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java

@@ -714,6 +714,8 @@ class ActionScheduler implements Runnable {
             LOG.info("Removing command from queue, host={}, commandId={} ", host, c.getCommandId());
             actionQueue.dequeue(host, c.getCommandId());
           } else {
+            cancelCommandOnTimeout(Collections.singletonList(s.getHostRoleCommand(host, roleStr)));
+
             // reschedule command
             commandsToSchedule.add(c);
             LOG.trace("===> commandsToSchedule(reschedule)=" + commandsToSchedule.size());
@@ -1061,6 +1063,17 @@ class ActionScheduler implements Runnable {
       }
     }
   }
+  void cancelCommandOnTimeout(Collection<HostRoleCommand> hostRoleCommands) {
+    for (HostRoleCommand hostRoleCommand : hostRoleCommands) {
+      if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED ||
+            hostRoleCommand.getStatus() == HostRoleStatus.IN_PROGRESS) {
+        CancelCommand cancelCommand = new CancelCommand();
+        cancelCommand.setTargetTaskId(hostRoleCommand.getTaskId());
+        cancelCommand.setReason("");
+        actionQueue.enqueue(hostRoleCommand.getHostName(), cancelCommand);
+      }
+    }
+  }
 
 
   /**

+ 56 - 0
ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionManager.java

@@ -133,6 +133,45 @@ public class TestActionManager {
     assertFalse(db.getRequest(requestId).getEndTime() == -1);
   }
 
+  @Test
+  public void testActionResponsesUnsorted() throws AmbariException {
+    ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
+    ActionManager am = new ActionManager(5000, 1200000, new ActionQueue(),
+        clusters, db, new HostsMap((String) null), unitOfWork,
+        injector.getInstance(RequestFactory.class), null, null);
+    populateActionDBWithTwoCommands(db, hostname);
+    Stage stage = db.getAllStages(requestId).get(0);
+    Assert.assertEquals(stageId, stage.getStageId());
+    stage.setHostRoleStatus(hostname, "HBASE_MASTER", HostRoleStatus.QUEUED);
+    db.hostRoleScheduled(stage, hostname, "HBASE_MASTER");
+    List<CommandReport> reports = new ArrayList<CommandReport>();
+    CommandReport cr = new CommandReport();
+    cr.setTaskId(2);
+    cr.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr.setRole("HBASE_REGIONSERVER");
+    cr.setStatus("COMPLETED");
+    cr.setStdErr("ERROR");
+    cr.setStdOut("OUTPUT");
+    cr.setStructuredOut("STRUCTURED_OUTPUT");
+    cr.setExitCode(215);
+    reports.add(cr);
+    CommandReport cr2 = new CommandReport();
+    cr2.setTaskId(1);
+    cr2.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr2.setRole("HBASE_MASTER");
+    cr2.setStatus("IN_PROGRESS");
+    cr2.setStdErr("ERROR");
+    cr2.setStdOut("OUTPUT");
+    cr2.setStructuredOut("STRUCTURED_OUTPUT");
+    cr2.setExitCode(215);
+    reports.add(cr2);
+    am.processTaskResponse(hostname, reports, am.getTasks(Arrays.asList(new Long[]{1L, 2L})));
+    assertEquals(HostRoleStatus.IN_PROGRESS, am.getAction(requestId, stageId)
+        .getHostRoleStatus(hostname, "HBASE_MASTER"));
+    assertEquals(HostRoleStatus.PENDING, am.getAction(requestId, stageId)
+        .getHostRoleStatus(hostname, "HBASE_REGIONSERVER"));
+  }
+
   @Test
   public void testLargeLogs() throws AmbariException {
     ActionDBAccessor db = injector.getInstance(ActionDBAccessorImpl.class);
@@ -189,6 +228,23 @@ public class TestActionManager {
     db.persistActions(request);
   }
 
+  private void populateActionDBWithTwoCommands(ActionDBAccessor db, String hostname) throws AmbariException {
+    Stage s = stageFactory.createNew(requestId, "/a/b", "cluster1", 1L, "action manager test", "clusterHostInfo", "commandParamsStage", "hostParamsStage");
+    s.setStageId(stageId);
+    s.addHostRoleExecutionCommand(hostname, Role.HBASE_MASTER,
+        RoleCommand.START,
+        new ServiceComponentHostStartEvent(Role.HBASE_MASTER.toString(),
+          hostname, System.currentTimeMillis()), "cluster1", "HBASE", false, false);
+    s.addHostRoleExecutionCommand(hostname, Role.HBASE_REGIONSERVER,
+        RoleCommand.START,
+        new ServiceComponentHostStartEvent(Role.HBASE_REGIONSERVER.toString(),
+          hostname, System.currentTimeMillis()), "cluster1", "HBASE", false, false);
+    List<Stage> stages = new ArrayList<Stage>();
+    stages.add(s);
+    Request request = new Request(stages, clusters);
+    db.persistActions(request);
+  }
+
   // Test failing ... tracked by Jira BUG-4966
   @Ignore
   @Test

+ 13 - 7
ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java

@@ -200,15 +200,18 @@ public class TestActionScheduler {
     scheduler.setTaskTimeoutAdjustment(false);
 
     List<AgentCommand> ac = waitForQueueSize(hostname, aq, 1, scheduler);
-    assertTrue(ac.get(0) instanceof ExecutionCommand);
-    assertEquals("1-977", ((ExecutionCommand) (ac.get(0))).getCommandId());
-    assertEquals(clusterHostInfo, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo());
+    AgentCommand scheduledCommand = ac.get(0);
+    assertTrue(scheduledCommand instanceof ExecutionCommand);
+    assertEquals("1-977", ((ExecutionCommand) scheduledCommand).getCommandId());
+    assertEquals(clusterHostInfo, ((ExecutionCommand) scheduledCommand).getClusterHostInfo());
 
     //The action status has not changed, it should be queued again.
-    ac = waitForQueueSize(hostname, aq, 1, scheduler);
-    assertTrue(ac.get(0) instanceof ExecutionCommand);
-    assertEquals("1-977", ((ExecutionCommand) (ac.get(0))).getCommandId());
-    assertEquals(clusterHostInfo, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo());
+    ac = waitForQueueSize(hostname, aq, 2, scheduler);
+    // first command is cancel for previous
+    scheduledCommand = ac.get(1);
+    assertTrue(scheduledCommand instanceof ExecutionCommand);
+    assertEquals("1-977", ((ExecutionCommand) scheduledCommand).getCommandId());
+    assertEquals(clusterHostInfo, ((ExecutionCommand) scheduledCommand).getClusterHostInfo());
 
     //Now change the action status
     s.setHostRoleStatus(hostname, "NAMENODE", HostRoleStatus.COMPLETED);
@@ -313,6 +316,9 @@ public class TestActionScheduler {
     //Check that in_progress command is rescheduled
     assertEquals(HostRoleStatus.QUEUED, stages.get(0).getHostRoleStatus(hostname, "SECONDARY_NAMENODE"));
 
+    // Check was generated cancel command on timeout
+    assertFalse(aq.dequeue(hostname, AgentCommandType.CANCEL_COMMAND).isEmpty());
+
     //Switch command back to IN_PROGRESS status and check that other command is not rescheduled
     stages.get(0).setHostRoleStatus(hostname, "SECONDARY_NAMENODE", HostRoleStatus.IN_PROGRESS);
     scheduler.doWork();

+ 3 - 1
ambari-server/src/test/java/org/apache/ambari/server/agent/HeartbeatProcessorTest.java

@@ -1269,7 +1269,9 @@ public class HeartbeatProcessorTest {
     cmdReport.setRole("install_packages");
     cmdReport.setClusterName(DummyCluster);
 
-    hb.setReports(Collections.singletonList(cmdReport));
+    List<CommandReport> reports = new ArrayList<>();
+    reports.add(cmdReport);
+    hb.setReports(reports);
     hb.setTimestamp(0L);
     hb.setResponseId(0);
     hb.setNodeStatus(new HostStatus(HostStatus.Status.HEALTHY, DummyHostStatus));