|
|
@@ -35,6 +35,25 @@ from common_functions import OSCheck
|
|
|
|
|
|
logger = logging.getLogger()
|
|
|
|
|
|
+# OS info
|
|
|
+OS_VERSION = OSCheck().get_os_major_version()
|
|
|
+OS_TYPE = OSCheck.get_os_type()
|
|
|
+OS_FAMILY = OSCheck.get_os_family()
|
|
|
+
|
|
|
+# OS constants
|
|
|
+OS_UBUNTU_DEBIAN = 'debian'
|
|
|
+OS_UBUNTU = 'ubuntu'
|
|
|
+OS_FEDORA = 'fedora'
|
|
|
+OS_OPENSUSE = 'opensuse'
|
|
|
+OS_SUSE = 'suse'
|
|
|
+OS_SUSE_ENTERPRISE = 'sles'
|
|
|
+
|
|
|
+# service cmd
|
|
|
+SERVICE_CMD = "/sbin/service"
|
|
|
+
|
|
|
+# on ubuntu iptables service is called ufw
|
|
|
+if OS_FAMILY == OS_UBUNTU_DEBIAN:
|
|
|
+ SERVICE_CMD = "/usr/sbin/service"
|
|
|
|
|
|
class HostInfo:
|
|
|
# List of project names to be used to find alternatives folders etc.
|
|
|
@@ -102,18 +121,6 @@ class HostInfo:
|
|
|
# default timeout for async invoked processes
|
|
|
TIMEOUT_SECONDS = 60
|
|
|
RESULT_UNAVAILABLE = "unable_to_determine"
|
|
|
-
|
|
|
- OS_FAMILY = OSCheck.get_os_family()
|
|
|
- OS_UBUNTU_DEBIAN = 'debian'
|
|
|
- # service cmd
|
|
|
- SERVICE_CMD = "/sbin/service"
|
|
|
- FIREWALL_SERVICE_NAME = "iptables"
|
|
|
- # on ubuntu iptables service is called ufw
|
|
|
- if OS_FAMILY == OS_UBUNTU_DEBIAN:
|
|
|
- SERVICE_CMD = "/usr/sbin/service"
|
|
|
- FIREWALL_SERVICE_NAME = "ufw"
|
|
|
-
|
|
|
- FIREWALL_STATUS_CMD = "%s %s status" % (SERVICE_CMD, FIREWALL_SERVICE_NAME)
|
|
|
|
|
|
DEFAULT_SERVICE_NAME = "ntpd"
|
|
|
SERVICE_STATUS_CMD = "%s %s status" % (SERVICE_CMD, DEFAULT_SERVICE_NAME)
|
|
|
@@ -279,20 +286,25 @@ class HostInfo:
|
|
|
else:
|
|
|
return self.current_umask
|
|
|
|
|
|
+ def getFirewallObject(self):
|
|
|
+ if OS_TYPE == OS_UBUNTU:
|
|
|
+ return UbuntuFirewallChecks()
|
|
|
+ elif OS_TYPE == OS_FEDORA and int(OS_VERSION) >= 18:
|
|
|
+ return Fedora18FirewallChecks()
|
|
|
+ elif OS_TYPE == OS_SUSE or OS_TYPE == OS_OPENSUSE or OS_TYPE == OS_SUSE_ENTERPRISE:
|
|
|
+ return SuseFirewallChecks()
|
|
|
+ else:
|
|
|
+ return FirewallChecks()
|
|
|
|
|
|
- def checkIptables(self):
|
|
|
- iptablesIsRunning = True
|
|
|
- try:
|
|
|
- iptables = subprocess.Popen(["iptables", "-S"], stdout=subprocess.PIPE)
|
|
|
- stdout = iptables.communicate()
|
|
|
- if stdout == ('-P INPUT ACCEPT\n-P FORWARD ACCEPT\n-P OUTPUT ACCEPT\n', None):
|
|
|
- iptablesIsRunning = False
|
|
|
- except:
|
|
|
- pass
|
|
|
- return iptablesIsRunning
|
|
|
-
|
|
|
-
|
|
|
+ def getFirewallObjectTypes(self):
|
|
|
+ # To support test code, so tests can loop through the types
|
|
|
+ return (FirewallChecks,
|
|
|
+ UbuntuFirewallChecks,
|
|
|
+ Fedora18FirewallChecks,
|
|
|
+ SuseFirewallChecks)
|
|
|
|
|
|
+ def checkIptables(self):
|
|
|
+ return self.getFirewallObject().check_iptables()
|
|
|
|
|
|
""" Return various details about the host
|
|
|
componentsMapped: indicates if any components are mapped to this host
|
|
|
@@ -364,6 +376,77 @@ class HostInfo:
|
|
|
pass
|
|
|
|
|
|
|
|
|
+class FirewallChecks(object):
|
|
|
+ def __init__(self):
|
|
|
+ self.FIREWALL_SERVICE_NAME = "iptables"
|
|
|
+ self.SERVICE_CMD = SERVICE_CMD
|
|
|
+ self.SERVICE_SUBCMD = "status"
|
|
|
+
|
|
|
+ def get_command(self):
|
|
|
+ return "%s %s %s" % (self.SERVICE_CMD, self.FIREWALL_SERVICE_NAME, self.SERVICE_SUBCMD)
|
|
|
+
|
|
|
+ def check_result(self, retcode, out, err):
|
|
|
+ return retcode == 0
|
|
|
+
|
|
|
+ def check_iptables(self):
|
|
|
+ retcode, out, err = self.run_os_command(self.get_command())
|
|
|
+ return self.check_result(retcode, out, err)
|
|
|
+
|
|
|
+ def get_running_result(self):
|
|
|
+ # To support test code. Expected ouput from run_os_command.
|
|
|
+ return (0, "", "")
|
|
|
+
|
|
|
+ def get_stopped_result(self):
|
|
|
+ # To support test code. Expected output from run_os_command.
|
|
|
+ return (3, "", "")
|
|
|
+
|
|
|
+ def run_os_command(self, cmd):
|
|
|
+ if type(cmd) == str:
|
|
|
+ cmd = shlex.split(cmd)
|
|
|
+
|
|
|
+ process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE,
|
|
|
+ stderr=subprocess.PIPE)
|
|
|
+
|
|
|
+ (stdoutdata, stderrdata) = process.communicate()
|
|
|
+ return process.returncode, stdoutdata, stderrdata
|
|
|
+
|
|
|
+
|
|
|
+class UbuntuFirewallChecks(FirewallChecks):
|
|
|
+ def __init__(self):
|
|
|
+ super(UbuntuFirewallChecks, self).__init__()
|
|
|
+
|
|
|
+ self.FIREWALL_SERVICE_NAME = "ufw"
|
|
|
+ self.SERVICE_CMD = 'service'
|
|
|
+
|
|
|
+ def check_result(self, retcode, out, err):
|
|
|
+ # On ubuntu, the status command returns 0 whether running or not
|
|
|
+ return out and len(out) > 0 and out.strip() != "ufw stop/waiting"
|
|
|
+
|
|
|
+ def get_running_result(self):
|
|
|
+ # To support test code. Expected ouput from run_os_command.
|
|
|
+ return (0, "ufw start/running", "")
|
|
|
+
|
|
|
+ def get_stopped_result(self):
|
|
|
+ # To support test code. Expected output from run_os_command.
|
|
|
+ return (0, "ufw stop/waiting", "")
|
|
|
+
|
|
|
+
|
|
|
+class Fedora18FirewallChecks(FirewallChecks):
|
|
|
+ def __init__(self):
|
|
|
+ self.FIREWALL_SERVICE_NAME = "firewalld.service"
|
|
|
+
|
|
|
+ def get_command(self):
|
|
|
+ return "systemctl is-active firewalld.service"
|
|
|
+
|
|
|
+
|
|
|
+class SuseFirewallChecks(FirewallChecks):
|
|
|
+ def __init__(self):
|
|
|
+ self.FIREWALL_SERVICE_NAME = "SuSEfirewall2"
|
|
|
+
|
|
|
+ def get_command(self):
|
|
|
+ return "/sbin/SuSEfirewall2 status"
|
|
|
+
|
|
|
+
|
|
|
def main(argv=None):
|
|
|
h = HostInfo()
|
|
|
struct = {}
|
|
|
@@ -372,4 +455,4 @@ def main(argv=None):
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
- main()
|
|
|
+ main()
|