Browse Source

AMBARI-1534. Add Nagios check for ambari-server process (Vitaly Brodetskyi via dlysnichenko)

Lisnichenko Dmitro 11 years ago
parent
commit
8bf3d0b56c

+ 1 - 0
ambari-agent/conf/unix/ambari-agent.ini

@@ -23,6 +23,7 @@ prefix=/var/lib/ambari-agent/data
 loglevel=INFO
 data_cleanup_interval=86400
 data_cleanup_max_age=2592000
+ping_port=0
 
 [stack]
 installprefix=/var/ambari-agent/

+ 5 - 0
ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-commands.cfg.erb

@@ -106,3 +106,8 @@ define command{
         command_name    check_resourcemanager_nodes_percentage
         command_line    $USER1$/check_resourcemanager_nodes_percentage.sh $HOSTADDRESS$ $ARG1$ $ARG2$ $ARG3$ $ARG4$
        }
+
+define command{
+	command_name	check_tcp_on_host
+	command_line	$USER1$/check_tcp -H $ARG1$ -p $ARG2$ $ARG3$
+	}

+ 14 - 0
ambari-agent/src/main/puppet/modules/hdp-nagios/templates/hadoop-services.cfg.erb

@@ -77,6 +77,20 @@ define service {
         max_check_attempts      2
 }
 
+<%scope.function_hdp_template_var("all_hosts").each_with_index do |hostname, index|-%>
+define service {
+        hostgroup_name          agent-servers
+        use                     hadoop-service
+        service_description     AMBARI::Ambari Agent process on <%=hostname%>
+        servicegroups           AMBARI
+        check_command           check_tcp_on_host!<%=hostname%>!<%=scope.function_hdp_template_var("all_ping_ports")[index]%>!-w 1 -c 1
+        normal_check_interval   1
+        retry_check_interval    0.25
+        max_check_attempts      4
+}
+
+<%end-%>
+
 # NAGIOS SERVER ZOOKEEPER Checks
 <%if scope.function_hdp_nagios_members_exist('zookeeper-servers')-%>
 define service {

+ 1 - 0
ambari-agent/src/main/python/ambari_agent/AmbariConfig.py

@@ -33,6 +33,7 @@ secured_url_port=8441
 prefix=/tmp/ambari-agent
 data_cleanup_interval=86400
 data_cleanup_max_age=2592000
+ping_port=0
 
 [services]
 

+ 64 - 0
ambari-agent/src/main/python/ambari_agent/PingPortListener.py

@@ -0,0 +1,64 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import sys
+import logging
+import AmbariConfig
+import threading
+import socket
+
+logger = logging.getLogger()
+
+class PingPortListener(threading.Thread):
+
+
+  def __init__(self, config):
+    threading.Thread.__init__(self)
+    self.daemon = True
+    self.running = True
+    self.config = config
+    self.host = '0.0.0.0'
+    self.port = int(self.config.get('agent','ping_port'))
+    try:
+      self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+      self.socket.bind((self.host, self.port))
+      self.socket.listen(1)
+    except Exception as ex:
+      logger.error("Failed to start ping port listener of:" + str(ex));
+      sys.exit(1)
+    else:
+      config.set('agent','current_ping_port',str(self.socket.getsockname()[1]))
+      logger.info("Ping port listener started on port: " + str(self.socket.getsockname()[1]))
+
+
+  def __del__(self):
+    logger.info("Ping port listener killed")
+
+
+  def run(self):
+    while  self.running:
+      try:
+        conn, addr = self.socket.accept()
+        conn.send("OK")
+        conn.close()
+      except Exception as ex:
+        logger.error("Failed in Ping port listener because of:" + str(ex));
+        sys.exit(1)
+  pass

+ 2 - 0
ambari-agent/src/main/python/ambari_agent/Register.py

@@ -43,10 +43,12 @@ class Register:
     hostInfo.register(agentEnv, False, False)
 
     version = self.read_agent_version()
+    current_ping_port = self.config.get('agent','current_ping_port')
     
     register = { 'responseId'        : int(id),
                  'timestamp'         : timestamp,
                  'hostname'          : hostname.hostname(),
+                 'currentPingPort'   : int(current_ping_port),
                  'publicHostname'    : hostname.public_hostname(),
                  'hardwareProfile'   : self.hardware.get(),
                  'agentEnv'          : agentEnv,

+ 5 - 0
ambari-agent/src/main/python/ambari_agent/main.py

@@ -33,6 +33,7 @@ from shell import killstaleprocesses
 import AmbariConfig
 from security import CertificateManager
 from NetUtil import NetUtil
+from PingPortListener import PingPortListener
 import security
 import hostname
 from DataCleaner import DataCleaner
@@ -209,6 +210,10 @@ def main():
 
   killstaleprocesses()
 
+  # Starting ping port listener
+  ping_port_listener = PingPortListener(config)
+  ping_port_listener.start()
+
   update_log_level(config)
 
   server_url = 'https://' + config.get('server', 'hostname') + ':' + config.get('server', 'url_port')

+ 9 - 3
ambari-agent/src/test/python/TestMain.py

@@ -31,6 +31,7 @@ from ambari_agent.AmbariConfig import AmbariConfig
 import ConfigParser
 import os
 import tempfile
+from ambari_agent.PingPortListener import PingPortListener
 from ambari_agent.Controller import Controller
 from optparse import OptionParser
 from ambari_agent.DataCleaner import DataCleaner
@@ -238,12 +239,15 @@ class TestMain(unittest.TestCase):
   @patch("optparse.OptionParser.parse_args")
   @patch.object(DataCleaner,"start")
   @patch.object(DataCleaner,"__init__")
-  def test_main(self, data_clean_init_mock,data_clean_start_mock, parse_args_mock, join_mock, start_mock,
-                Controller_init_mock, try_to_connect_mock, update_log_level_mock,
-                killstaleprocesses_mock, daemonize_mock, perform_prestart_checks_mock,
+  @patch.object(PingPortListener,"start")
+  @patch.object(PingPortListener,"__init__")
+  def test_main(self, ping_port_init_mock, ping_port_start_mock, data_clean_init_mock,data_clean_start_mock,
+                parse_args_mock, join_mock, start_mock, Controller_init_mock, try_to_connect_mock,
+                update_log_level_mock, killstaleprocesses_mock, daemonize_mock, perform_prestart_checks_mock,
                 resolve_ambari_config_mock, stop_mock, bind_signal_handlers_mock, setup_logging_mock):
     data_clean_init_mock.return_value = None
     Controller_init_mock.return_value = None
+    ping_port_init_mock.return_value = None
     options = MagicMock()
     parse_args_mock.return_value = (options, MagicMock)
 
@@ -262,6 +266,8 @@ class TestMain(unittest.TestCase):
     self.assertTrue(start_mock.called)
     self.assertTrue(data_clean_init_mock.called)
     self.assertTrue(data_clean_start_mock.called)
+    self.assertTrue(ping_port_init_mock.called)
+    self.assertTrue(ping_port_start_mock.called)
 
     perform_prestart_checks_mock.reset_mock()
 

+ 60 - 0
ambari-agent/src/test/python/TestPingPortListener.py

@@ -0,0 +1,60 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import unittest
+from mock.mock import patch, MagicMock, call, Mock
+from ambari_agent import PingPortListener
+import socket
+import sys
+
+class TestPingPortListener(unittest.TestCase):
+
+  def setUp(self):
+    self.config = MagicMock()
+    self.config.get.return_value = 55000
+    PingPortListener.logger = MagicMock()
+
+  @patch("socket.socket")
+  def test_init_success(self,socketMock):
+    PingPortListener.logger.reset_mock()
+    allive_daemon = PingPortListener.PingPortListener(self.config)
+    self.assertFalse(PingPortListener.logger.warn.called)
+    self.assertTrue(socketMock.call_args_list[0][0][0] == socket.AF_INET)
+    self.assertTrue(socketMock.call_args_list[0][0][1] == socket.SOCK_STREAM)
+    self.assertTrue(allive_daemon.socket.bind.call_args_list[0][0][0] == ('0.0.0.0',55000))
+    self.assertTrue(allive_daemon.socket.listen.call_args_list[0][0][0] == 1)
+    self.assertTrue(allive_daemon.config.set.call_args_list[0][0][0] == 'agent')
+    self.assertTrue(allive_daemon.config.set.call_args_list[0][0][1] == 'current_ping_port')
+
+
+
+  @patch.object(socket.socket,"bind")
+  @patch.object(socket.socket,"listen")
+  @patch.object(socket.socket,"__init__")
+  @patch.object(sys, "exit")
+  def test_init_warn(self, sys_exit_mock, socketInitMock,socketListenMock,socketBindMock):
+    PingPortListener.logger.reset_mock()
+    allive_daemon = PingPortListener.PingPortListener(self.config)
+    self.assertTrue(socketInitMock.called)
+    self.assertTrue(sys_exit_mock.called)
+
+if __name__ == "__main__":
+  suite = unittest.TestLoader().loadTestsFromTestCase(PingPortListener)
+  unittest.TextTestRunner(verbosity=2).run(suite)

+ 3 - 1
ambari-agent/src/test/python/TestRegistration.py

@@ -34,6 +34,7 @@ class TestRegistration(TestCase):
     config = AmbariConfig().getConfig()
     tmpdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tmpdir)
+    config.set('agent', 'current_ping_port', '33777')
     get_os_type_method.return_value = 'redhat'
     ver_file = os.path.join(tmpdir, "version")
     with open(ver_file, "w") as text_file:
@@ -51,6 +52,7 @@ class TestRegistration(TestCase):
     self.assertEquals(data['agentVersion'], '1.3.0', "agentVersion should not be empty")
     print data['agentEnv']['umask']
     self.assertEquals(not data['agentEnv']['umask']== "", True, "agents umask should not be empty")
-    self.assertEquals(len(data), 7)
+    self.assertEquals(data['currentPingPort'] == 33777, True, "current ping port should be 33777")
+    self.assertEquals(len(data), 8)
 
     os.remove(ver_file)

+ 4 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java

@@ -395,6 +395,7 @@ public class HeartBeatHandler {
   public RegistrationResponse handleRegistration(Register register)
       throws InvalidStateTransitionException, AmbariException {
     String hostname = register.getHostname();
+    int currentPingPort = register.getCurrentPingPort();
     long now = System.currentTimeMillis();
 
     String agentVersion = register.getAgentVersion();
@@ -437,6 +438,9 @@ public class HeartBeatHandler {
     // Resetting host state
     hostObject.setState(HostState.INIT);
 
+    // Set ping port for agent
+    hostObject.setCurrentPingPort(currentPingPort);
+
     // Get status of service components
     List<StatusCommand> cmds = heartbeatMonitor.generateStatusCommands(hostname);
 

+ 11 - 1
ambari-server/src/main/java/org/apache/ambari/server/agent/Register.java

@@ -29,6 +29,7 @@ public class Register {
   private int responseId = -1;
   private long timestamp;
   private String hostname;
+  private int currentPingPort;
   private HostInfo hardwareProfile;
   private String publicHostname;
   private AgentEnv agentEnv;
@@ -92,11 +93,20 @@ public class Register {
     this.agentVersion = agentVersion;
   }
 
+  public int getCurrentPingPort() {
+    return currentPingPort;
+  }
+
+  public void setCurrentPingPort(int currentPingPort) {
+    this.currentPingPort = currentPingPort;
+  }
+
   @Override
   public String toString() {
     String ret = "responseId=" + responseId + "\n" +
              "timestamp=" + timestamp + "\n" +
-             "hostname="  + hostname + "\n";
+             "hostname="  + hostname + "\n" +
+             "currentPingPort=" + currentPingPort + "\n";
 
     if (hardwareProfile != null)
       ret = ret + "hardwareprofile=" + this.hardwareProfile.toString();

+ 10 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/Host.java

@@ -39,6 +39,16 @@ public interface Host {
    */
   public void setHostName(String hostName);
 
+  /**
+   * @return the currentPingPort
+   */
+  public Integer getCurrentPingPort();
+
+  /**
+   * @param currentPingPort the currentPingPort to set
+   */
+  public void setCurrentPingPort(Integer currentPingPort);
+
   /**
    * Gets the public-facing host name.
    */

+ 23 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/host/HostImpl.java

@@ -90,6 +90,7 @@ public class HostImpl implements Host {
   private long lastHeartbeatTime = 0L;
   private AgentEnv lastAgentEnv = null;
   private boolean persisted = false;
+  private Integer currentPingPort = null;
 
   private static final String HARDWAREISA = "hardware_isa";
   private static final String HARDWAREMODEL = "hardware_model";
@@ -567,6 +568,28 @@ public class HostImpl implements Host {
     }
   }
   
+  @Override
+  public Integer getCurrentPingPort() {
+    try {
+      readLock.lock();
+      return currentPingPort;
+    }
+    finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public void setCurrentPingPort(Integer currentPingPort) {
+    try {
+      writeLock.lock();
+      this.currentPingPort = currentPingPort;
+    }
+    finally {
+      writeLock.unlock();
+    }
+  }
+
   @Override
   public void setPublicHostName(String hostName) {
     try {

+ 4 - 2
ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java

@@ -233,13 +233,15 @@ public class StageUtils {
       }
     }
 
-    // Add a list of all host for agent and host monitoring
+    // Add a lists of all hosts and all ping ports for agents and hosts monitoring
     List<String> allHostNames = new ArrayList<String>();
+    List<String> allHostPingPorts = new ArrayList<String>();
     for (Host host : allHosts.values()) {
       allHostNames.add(host.getHostName());
+      allHostPingPorts.add(host.getCurrentPingPort() == null ? null : host.getCurrentPingPort().toString());
     }
     info.put("all_hosts", allHostNames);
-
+    info.put("all_ping_ports", allHostPingPorts);
     return info;
   }
 

+ 2 - 0
ambari-server/src/test/java/org/apache/ambari/server/agent/DummyHeartbeatConstants.java

@@ -31,6 +31,8 @@ public interface DummyHeartbeatConstants {
   String DummyOsType = "centos5";
   String DummyOSRelease = "5.8";
 
+  Integer DummyCurrentPingPort = 33555;
+
   String DummyHostStatus = "I am ok";
 
   String DummyStackId = "HDP-0.1";

+ 3 - 0
ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java

@@ -31,6 +31,7 @@ import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HDFS;
 import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HDFS_CLIENT;
 import static org.apache.ambari.server.agent.DummyHeartbeatConstants.NAMENODE;
 import static org.apache.ambari.server.agent.DummyHeartbeatConstants.SECONDARY_NAMENODE;
+import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyCurrentPingPort;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -477,11 +478,13 @@ public class TestHeartbeatHandler {
     hi.setHostName(DummyHostname1);
     hi.setOS(DummyOsType);
     reg.setHostname(DummyHostname1);
+    reg.setCurrentPingPort(DummyCurrentPingPort);
     reg.setHardwareProfile(hi);
     reg.setAgentVersion(metaInfo.getServerVersion());
     handler.handleRegistration(reg);
     assertEquals(hostObject.getState(), HostState.HEALTHY);
     assertEquals(DummyOsType, hostObject.getOsType());
+    assertEquals(DummyCurrentPingPort, hostObject.getCurrentPingPort());
     assertTrue(hostObject.getLastRegistrationTime() != 0);
     assertEquals(hostObject.getLastHeartbeatTime(),
         hostObject.getLastRegistrationTime());

+ 5 - 0
ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java

@@ -194,6 +194,10 @@ public class TestStageUtils {
     fsm.getHost("h2").setOsType("centos5");
     fsm.getHost("h3").setOsType("centos5");
     fsm.getHost("h4").setOsType("centos5");
+    fsm.getHost("h1").setCurrentPingPort(1024);
+    fsm.getHost("h2").setCurrentPingPort(1024);
+    fsm.getHost("h3").setCurrentPingPort(1024);
+    fsm.getHost("h4").setCurrentPingPort(1024);
     fsm.getHost("h1").persist();
     fsm.getHost("h2").persist();
     fsm.getHost("h3").persist();
@@ -213,6 +217,7 @@ public class TestStageUtils {
     assertEquals(2, info.get("hbase_rs_hosts").size());
     assertEquals(1, info.get("hbase_master_hosts").size());
     assertEquals(4, info.get("all_hosts").size());
+    assertEquals(4, info.get("all_ping_ports").size());
     assertEquals("h1", info.get("hbase_master_hosts").get(0));
 
     assertFalse(info.get("ambari_db_rca_url").get(0).contains(Configuration.HOSTNAME_MACRO));