Kaynağa Gözat

AMBARI-5480. There's significant lag for the Restart Indicators to go away after restarting (dlysnichenko)

Lisnichenko Dmitro 11 yıl önce
ebeveyn
işleme
e445a9ce48

+ 4 - 0
ambari-agent/src/main/python/ambari_agent/ActionQueue.py

@@ -208,6 +208,10 @@ class ActionQueue(threading.Thread):
     if roleResult['stderr'] == '':
       roleResult['stderr'] = 'None'
 
+    # let ambari know name of custom command
+    if command['hostLevelParams'].has_key('custom_command'):
+      roleResult['customCommand'] = command['hostLevelParams']['custom_command']
+
     if 'structuredOut' in commandresult:
       roleResult['structuredOut'] = str(commandresult['structuredOut'])
     else:

+ 9 - 2
ambari-agent/src/test/python/ambari_agent/TestActionQueue.py

@@ -57,6 +57,7 @@ class TestActionQueue(TestCase):
     'taskId': 3,
     'clusterName': u'cc',
     'serviceName': u'HDFS',
+    'hostLevelParams': {},
     'configurations':{'global' : {}},
     'configurationTags':{'global' : { 'tag': 'v1' }}
   }
@@ -69,7 +70,7 @@ class TestActionQueue(TestCase):
       'serviceName' : "serviceName",
       'roleCommand' : 'UPGRADE',
       'hostname' : "localhost.localdomain",
-      'hostLevelParams': "hostLevelParams",
+      'hostLevelParams': {},
       'clusterHostInfo': "clusterHostInfo",
       'commandType': "EXECUTION_COMMAND",
       'configurations':{'global' : {}},
@@ -88,6 +89,7 @@ class TestActionQueue(TestCase):
     'taskId': 4,
     'clusterName': u'cc',
     'serviceName': u'HDFS',
+    'hostLevelParams': {}
     }
 
   snamenode_install_command = {
@@ -98,6 +100,7 @@ class TestActionQueue(TestCase):
     'taskId': 5,
     'clusterName': u'cc',
     'serviceName': u'HDFS',
+    'hostLevelParams': {}
     }
 
   nagios_install_command = {
@@ -108,6 +111,7 @@ class TestActionQueue(TestCase):
     'taskId': 6,
     'clusterName': u'cc',
     'serviceName': u'HDFS',
+    'hostLevelParams': {}
     }
 
   hbase_install_command = {
@@ -118,6 +122,7 @@ class TestActionQueue(TestCase):
     'taskId': 7,
     'clusterName': u'cc',
     'serviceName': u'HDFS',
+    'hostLevelParams': {}
     }
 
   status_command = {
@@ -125,7 +130,8 @@ class TestActionQueue(TestCase):
     "commandType" : "STATUS_COMMAND",
     "clusterName" : "",
     "componentName" : "DATANODE",
-    'configurations':{}
+    'configurations':{},
+    'hostLevelParams': {}
   }
 
   datanode_restart_command = {
@@ -411,6 +417,7 @@ class TestActionQueue(TestCase):
                 'roleCommand': u'CUSTOM_COMMAND',
                 'serviceName': u'HDFS',
                 'status': 'COMPLETED',
+                'customCommand': 'RESTART',
                 'stderr': 'stderr',
                 'stdout': 'out',
                 'structuredOut': '',

+ 13 - 2
ambari-server/src/main/java/org/apache/ambari/server/agent/CommandReport.java

@@ -35,9 +35,19 @@ public class CommandReport {
   private String serviceName;
   private long taskId;
   private String roleCommand;
-  
+  private String customCommand;
   private Map<String, Map<String, String>> configurationTags;
-  
+
+  @JsonProperty("customCommand")
+  public String getCustomCommand() {
+    return customCommand;
+  }
+
+  @JsonProperty("customCommand")
+  public void setCustomCommand(String customCommand) {
+    this.customCommand = customCommand;
+  }
+
   @JsonProperty("taskId")
   public long getTaskId() {
     return taskId;
@@ -178,6 +188,7 @@ public class CommandReport {
             ", taskId=" + taskId +
             ", roleCommand=" + roleCommand +
             ", configurationTags=" + configurationTags +
+            ", customCommand=" + customCommand +
             '}';
   }
 }

+ 23 - 4
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java

@@ -302,8 +302,12 @@ public class HeartBeatHandler {
     List<CommandReport> reports = heartbeat.getReports();
     for (CommandReport report : reports) {
       LOG.debug("Received command report: " + report);
+      //pass custom STAR, STOP and RESTART
       if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand()) ||
-        RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand())) {
+         (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+         !("RESTART".equals(report.getCustomCommand()) ||
+         "START".equals(report.getCustomCommand()) ||
+         "STOP".equals(report.getCustomCommand())))) {
         continue;
       }
       
@@ -326,7 +330,10 @@ public class HeartBeatHandler {
             // Updating stack version, if needed
             if (scHost.getState().equals(State.UPGRADING)) {
               scHost.setStackVersion(scHost.getDesiredStackVersion());
-            } else if (report.getRoleCommand().equals(RoleCommand.START.toString())
+            } else if ((report.getRoleCommand().equals(RoleCommand.START.toString()) ||
+                (report.getRoleCommand().equals(RoleCommand.CUSTOM_COMMAND.toString()) &&
+                    ("START".equals(report.getCustomCommand()) ||
+                    "RESTART".equals(report.getCustomCommand()))))
                 && null != report.getConfigurationTags()
                 && !report.getConfigurationTags().isEmpty()) {
               LOG.info("Updating applied config on service " + scHost.getServiceName() +
@@ -334,10 +341,22 @@ public class HeartBeatHandler {
               scHost.updateActualConfigs(report.getConfigurationTags());
             }
 
-            if (RoleCommand.START.toString().equals(report.getRoleCommand())) {
+            if (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+                !("START".equals(report.getCustomCommand()) ||
+                 "STOP".equals(report.getCustomCommand()))) {
+              //do not affect states for custom commands except START and STOP
+              //lets status commands to be responsible for this
+              continue;
+            }
+
+            if (RoleCommand.START.toString().equals(report.getRoleCommand()) ||
+                (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+                    "START".equals(report.getCustomCommand()))) {
               scHost.handleEvent(new ServiceComponentHostStartedEvent(schName,
                   hostname, now));
-            } else if (RoleCommand.STOP.toString().equals(report.getRoleCommand())) {
+            } else if (RoleCommand.STOP.toString().equals(report.getRoleCommand()) ||
+                (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
+                    "STOP".equals(report.getCustomCommand()))) {
               scHost.handleEvent(new ServiceComponentHostStoppedEvent(schName,
                   hostname, now));
             } else {

+ 156 - 0
ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java

@@ -229,6 +229,162 @@ public class TestHeartbeatHandler {
     Assert.assertEquals(serviceComponentHost1.getActualConfigs().size(), 1);
   }
 
+  @Test
+  public void testHeartbeatCustomCommandWithConfigs() throws Exception {
+    ActionManager am = getMockActionManager();
+
+    Cluster cluster = getDummyCluster();
+
+    @SuppressWarnings("serial")
+    Set<String> hostNames = new HashSet<String>(){{
+      add(DummyHostname1);
+    }};
+    clusters.mapHostsToCluster(hostNames, DummyCluster);
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+    hdfs.addServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(NAMENODE).persist();
+    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
+    hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+
+    ActionQueue aq = new ActionQueue();
+    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+
+    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+    serviceComponentHost1.setState(State.INSTALLED);
+    serviceComponentHost2.setState(State.INSTALLED);
+
+
+    HeartBeat hb = new HeartBeat();
+    hb.setResponseId(0);
+    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
+    hb.setHostname(DummyHostname1);
+
+    List<CommandReport> reports = new ArrayList<CommandReport>();
+    CommandReport cr = new CommandReport();
+    cr.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr.setServiceName(HDFS);
+    cr.setRoleCommand("CUSTOM_COMMAND");
+    cr.setCustomCommand("RESTART");
+    cr.setTaskId(1);
+    cr.setRole(DATANODE);
+    cr.setStatus("COMPLETED");
+    cr.setStdErr("");
+    cr.setStdOut("");
+    cr.setExitCode(215);
+    cr.setClusterName(DummyCluster);
+    cr.setConfigurationTags(new HashMap<String, Map<String,String>>() {{
+      put("global", new HashMap<String,String>() {{ put("tag", "version1"); }});
+    }});
+    CommandReport crn = new CommandReport();
+    crn.setActionId(StageUtils.getActionId(requestId, stageId));
+    crn.setServiceName(HDFS);
+    crn.setRoleCommand("CUSTOM_COMMAND");
+    crn.setCustomCommand("START");
+    crn.setTaskId(1);
+    crn.setRole(NAMENODE);
+    crn.setStatus("COMPLETED");
+    crn.setStdErr("");
+    crn.setStdOut("");
+    crn.setExitCode(215);
+    crn.setClusterName(DummyCluster);
+    crn.setConfigurationTags(new HashMap<String, Map<String,String>>() {{
+      put("global", new HashMap<String,String>() {{ put("tag", "version1"); }});
+    }});
+
+    reports.add(cr);
+    reports.add(crn);
+    hb.setReports(reports);
+
+    handler.handleHeartBeat(hb);
+
+    // the heartbeat test passed if actual configs is populated
+    Assert.assertNotNull(serviceComponentHost1.getActualConfigs());
+    Assert.assertEquals(serviceComponentHost1.getActualConfigs().size(), 1);
+    Assert.assertNotNull(serviceComponentHost2.getActualConfigs());
+    Assert.assertEquals(serviceComponentHost2.getActualConfigs().size(), 1);
+  }
+
+  @Test
+  public void testHeartbeatCustomStartStop() throws Exception {
+    ActionManager am = getMockActionManager();
+
+    Cluster cluster = getDummyCluster();
+
+    @SuppressWarnings("serial")
+    Set<String> hostNames = new HashSet<String>(){{
+      add(DummyHostname1);
+    }};
+    clusters.mapHostsToCluster(hostNames, DummyCluster);
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+    hdfs.addServiceComponent(DATANODE).persist();
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(NAMENODE).persist();
+    hdfs.getServiceComponent(NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+    hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
+    hdfs.getServiceComponent(SECONDARY_NAMENODE).addServiceComponentHost(DummyHostname1).persist();
+
+    ActionQueue aq = new ActionQueue();
+    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+
+    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+    ServiceComponentHost serviceComponentHost2 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(NAMENODE).getServiceComponentHost(DummyHostname1);
+    serviceComponentHost1.setState(State.INSTALLED);
+    serviceComponentHost2.setState(State.STARTED);
+
+
+    HeartBeat hb = new HeartBeat();
+    hb.setResponseId(0);
+    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
+    hb.setHostname(DummyHostname1);
+
+    List<CommandReport> reports = new ArrayList<CommandReport>();
+    CommandReport cr = new CommandReport();
+    cr.setActionId(StageUtils.getActionId(requestId, stageId));
+    cr.setServiceName(HDFS);
+    cr.setRoleCommand("CUSTOM_COMMAND");
+    cr.setCustomCommand("START");
+    cr.setTaskId(1);
+    cr.setRole(DATANODE);
+    cr.setStatus("COMPLETED");
+    cr.setStdErr("");
+    cr.setStdOut("");
+    cr.setExitCode(215);
+    cr.setClusterName(DummyCluster);
+    CommandReport crn = new CommandReport();
+    crn.setActionId(StageUtils.getActionId(requestId, stageId));
+    crn.setServiceName(HDFS);
+    crn.setRoleCommand("CUSTOM_COMMAND");
+    crn.setCustomCommand("STOP");
+    crn.setTaskId(1);
+    crn.setRole(NAMENODE);
+    crn.setStatus("COMPLETED");
+    crn.setStdErr("");
+    crn.setStdOut("");
+    crn.setExitCode(215);
+    crn.setClusterName(DummyCluster);
+
+    reports.add(cr);
+    reports.add(crn);
+    hb.setReports(reports);
+
+    handler.handleHeartBeat(hb);
+
+    // the heartbeat test passed if actual configs is populated
+    State componentState1 = serviceComponentHost1.getState();
+    assertEquals(State.STARTED, componentState1);
+    State componentState2 = serviceComponentHost2.getState();
+    assertEquals(State.INSTALLED, componentState2);
+  }
+
   @Test
   public void testStatusHeartbeat() throws Exception {
     ActionManager am = getMockActionManager();