|
@@ -1005,54 +1005,67 @@ class TestAmbariServer(TestCase):
|
|
|
ambari_server.store_password_file("password", "passfile")
|
|
|
self.assertTrue(set_file_permissions_mock.called)
|
|
|
|
|
|
-
|
|
|
- @patch.object(FirewallChecks, "run_os_command")
|
|
|
+ @patch("subprocess.Popen")
|
|
|
@patch.object(OSCheck, "get_os_family")
|
|
|
@patch.object(OSCheck, "get_os_type")
|
|
|
@patch.object(OSCheck, "get_os_major_version")
|
|
|
- def test_check_iptables_is_running(self, get_os_major_version_mock, get_os_type_mock, get_os_family_mock, run_os_command_mock):
|
|
|
+ def test_check_iptables_is_running(self, get_os_major_version_mock, get_os_type_mock, get_os_family_mock, popen_mock):
|
|
|
|
|
|
get_os_major_version_mock.return_value = 18
|
|
|
get_os_type_mock.return_value = OSConst.OS_FEDORA
|
|
|
get_os_family_mock.return_value = OSConst.REDHAT_FAMILY
|
|
|
|
|
|
firewall_obj = Firewall().getFirewallObject()
|
|
|
- run_os_command_mock.return_value = 0, "active", ""
|
|
|
+ p = MagicMock()
|
|
|
+ p.communicate.return_value = ("active", "err")
|
|
|
+ p.returncode = 0
|
|
|
+ popen_mock.return_value = p
|
|
|
self.assertEqual("Fedora18FirewallChecks", firewall_obj.__class__.__name__)
|
|
|
self.assertTrue(firewall_obj.check_iptables())
|
|
|
- run_os_command_mock.return_value = 3, "", ""
|
|
|
+ p.communicate.return_value = ("", "err")
|
|
|
+ p.returncode = 3
|
|
|
self.assertFalse(firewall_obj.check_iptables())
|
|
|
+ self.assertEqual("err", firewall_obj.stderrdata)
|
|
|
|
|
|
|
|
|
get_os_type_mock.return_value = OSConst.OS_UBUNTU
|
|
|
get_os_family_mock.return_value = OSConst.DEBIAN_FAMILY
|
|
|
|
|
|
firewall_obj = Firewall().getFirewallObject()
|
|
|
- run_os_command_mock.return_value = 0, "Status: active", ""
|
|
|
+ p.communicate.return_value = ("Status: active", "err")
|
|
|
+ p.returncode = 0
|
|
|
self.assertEqual("UbuntuFirewallChecks", firewall_obj.__class__.__name__)
|
|
|
self.assertTrue(firewall_obj.check_iptables())
|
|
|
- run_os_command_mock.return_value = 0, "Status: inactive", ""
|
|
|
+ p.communicate.return_value = ("Status: inactive", "err")
|
|
|
+ p.returncode = 0
|
|
|
self.assertFalse(firewall_obj.check_iptables())
|
|
|
+ self.assertEqual("err", firewall_obj.stderrdata)
|
|
|
|
|
|
get_os_type_mock.return_value = ""
|
|
|
get_os_family_mock.return_value = OSConst.SUSE_FAMILY
|
|
|
|
|
|
firewall_obj = Firewall().getFirewallObject()
|
|
|
- run_os_command_mock.return_value = 0, "### iptables", ""
|
|
|
+ p.communicate.return_value = ("### iptables", "err")
|
|
|
+ p.returncode = 0
|
|
|
self.assertEqual("SuseFirewallChecks", firewall_obj.__class__.__name__)
|
|
|
self.assertTrue(firewall_obj.check_iptables())
|
|
|
- run_os_command_mock.return_value = 0, "SuSEfirewall2 not active", ""
|
|
|
+ p.communicate.return_value = ("SuSEfirewall2 not active", "err")
|
|
|
+ p.returncode = 0
|
|
|
self.assertFalse(firewall_obj.check_iptables())
|
|
|
+ self.assertEqual("err", firewall_obj.stderrdata)
|
|
|
|
|
|
get_os_type_mock.return_value = ""
|
|
|
get_os_family_mock.return_value = OSConst.REDHAT_FAMILY
|
|
|
|
|
|
firewall_obj = Firewall().getFirewallObject()
|
|
|
- run_os_command_mock.return_value = 0, "Table: filter", ""
|
|
|
+ p.communicate.return_value = ("Table: filter", "err")
|
|
|
+ p.returncode = 0
|
|
|
self.assertEqual("FirewallChecks", firewall_obj.__class__.__name__)
|
|
|
self.assertTrue(firewall_obj.check_iptables())
|
|
|
- run_os_command_mock.return_value = 3, "", ""
|
|
|
+ p.communicate.return_value = ("", "err")
|
|
|
+ p.returncode = 3
|
|
|
self.assertFalse(firewall_obj.check_iptables())
|
|
|
+ self.assertEqual("err", firewall_obj.stderrdata)
|
|
|
|
|
|
|
|
|
def test_dlprogress(self):
|