123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- #!/usr/bin/env python2.6
- '''
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- '''
- import StringIO
- import sys, subprocess
- from ambari_agent import NetUtil
- from ambari_agent.security import CertificateManager
- from mock.mock import MagicMock, patch, ANY
- import mock.mock
- import unittest
- from ambari_agent import ProcessHelper, main
- import logging
- import signal
- from ambari_agent.AmbariConfig import AmbariConfig
- import ConfigParser
- import ssl
- import os
- import tempfile
- from ambari_agent.Controller import Controller
- from ambari_agent import security
- aa = mock.mock.mock_open()
- class TestSecurity(unittest.TestCase):
- def setUp(self):
- # disable stdout
- out = StringIO.StringIO()
- sys.stdout = out
- # Create config
- self.config = AmbariConfig().getConfig()
- # Instantiate CachedHTTPSConnection (skip connect() call)
- with patch.object(security.VerifiedHTTPSConnection, "connect"):
- self.cachedHTTPSConnection = security.CachedHTTPSConnection(self.config)
- def tearDown(self):
- # enable stdout
- sys.stdout = sys.__stdout__
- ### VerifiedHTTPSConnection ###
- @patch.object(security.CertificateManager, "initSecurity")
- @patch("socket.create_connection")
- @patch("ssl.wrap_socket")
- def test_VerifiedHTTPSConnection_connect(self, wrap_socket_mock,
- create_connection_mock,
- init_security_mock):
- init_security_mock.return_value = None
- self.config.set('security', 'keysdir', '/dummy-keysdir')
- connection = security.VerifiedHTTPSConnection("example.com",
- self.config.get('server', 'secured_url_port'), self.config)
- connection._tunnel_host = False
- connection.sock = None
- connection.connect()
- self.assertTrue(wrap_socket_mock.called)
- ### VerifiedHTTPSConnection with no certificates creation
- @patch.object(security.CertificateManager, "initSecurity")
- @patch("socket.create_connection")
- @patch("ssl.wrap_socket")
- def test_Verified_HTTPSConnection_non_secure_connect(self, wrap_socket_mock,
- create_connection_mock,
- init_security_mock):
- connection = security.VerifiedHTTPSConnection("example.com",
- self.config.get('server', 'secured_url_port'), self.config)
- connection._tunnel_host = False
- connection.sock = None
- connection.connect()
- self.assertFalse(init_security_mock.called)
- ### VerifiedHTTPSConnection with two-way SSL authentication enabled
- @patch.object(security.CertificateManager, "initSecurity")
- @patch("socket.create_connection")
- @patch("ssl.wrap_socket")
- def test_Verified_HTTPSConnection_two_way_ssl_connect(self, wrap_socket_mock,
- create_connection_mock,
- init_security_mock):
- wrap_socket_mock.side_effect=ssl.SSLError()
- connection = security.VerifiedHTTPSConnection("example.com",
- self.config.get('server', 'secured_url_port'), self.config)
- connection._tunnel_host = False
- connection.sock = None
- try:
- connection.connect()
- except ssl.SSLError:
- pass
- self.assertTrue(init_security_mock.called)
- ### CachedHTTPSConnection ###
- @patch.object(security.VerifiedHTTPSConnection, "connect")
- def test_CachedHTTPSConnection_connect(self, vhc_connect_mock):
- self.config.set('server', 'hostname', 'dummy.server.hostname')
- self.config.set('server', 'secured_url_port', '443')
- # Testing not connected case
- self.cachedHTTPSConnection.connected = False
- self.cachedHTTPSConnection.connect()
- self.assertTrue(vhc_connect_mock.called)
- vhc_connect_mock.reset_mock()
- # Testing already connected case
- self.cachedHTTPSConnection.connect()
- self.assertFalse(vhc_connect_mock.called)
- @patch.object(security.CachedHTTPSConnection, "connect")
- def test_forceClear(self, connect_mock):
- # Testing if httpsconn instance changed
- old = self.cachedHTTPSConnection.httpsconn
- self.cachedHTTPSConnection.forceClear()
- self.assertNotEqual(old, self.cachedHTTPSConnection.httpsconn)
- @patch.object(security.CachedHTTPSConnection, "connect")
- def test_request(self, connect_mock):
- httpsconn_mock = MagicMock(create = True)
- self.cachedHTTPSConnection.httpsconn = httpsconn_mock
- dummy_request = MagicMock(create = True)
- dummy_request.get_method.return_value = "dummy_get_method"
- dummy_request.get_full_url.return_value = "dummy_full_url"
- dummy_request.get_data.return_value = "dummy_get_data"
- dummy_request.headers = "dummy_headers"
- responce_mock = MagicMock(create = True)
- responce_mock.read.return_value = "dummy responce"
- httpsconn_mock.getresponse.return_value = responce_mock
- # Testing normal case
- responce = self.cachedHTTPSConnection.request(dummy_request)
- self.assertEqual(responce, responce_mock.read.return_value)
- httpsconn_mock.request.assert_called_once_with(
- dummy_request.get_method.return_value,
- dummy_request.get_full_url.return_value,
- dummy_request.get_data.return_value,
- dummy_request.headers)
- # Testing case of exception
- try:
- def side_eff():
- raise Exception("Dummy exception")
- httpsconn_mock.read.side_effect = side_eff
- responce = self.cachedHTTPSConnection.request(dummy_request)
- self.fail("Should raise IOError")
- except Exception, err:
- # Expected
- pass
- ### CertificateManager ###
- @patch("ambari_agent.hostname.hostname")
- def test_getAgentKeyName(self, hostname_mock):
- hostname_mock.return_value = "dummy.hostname"
- self.config.set('security', 'keysdir', '/dummy-keysdir')
- man = CertificateManager(self.config)
- res = man.getAgentKeyName()
- self.assertEquals(res, "/dummy-keysdir/dummy.hostname.key")
- @patch("ambari_agent.hostname.hostname")
- def test_getAgentCrtName(self, hostname_mock):
- hostname_mock.return_value = "dummy.hostname"
- self.config.set('security', 'keysdir', '/dummy-keysdir')
- man = CertificateManager(self.config)
- res = man.getAgentCrtName()
- self.assertEquals(res, "/dummy-keysdir/dummy.hostname.crt")
- @patch("ambari_agent.hostname.hostname")
- def test_getAgentCrtReqName(self, hostname_mock):
- hostname_mock.return_value = "dummy.hostname"
- self.config.set('security', 'keysdir', '/dummy-keysdir')
- man = CertificateManager(self.config)
- res = man.getAgentCrtReqName()
- self.assertEquals(res, "/dummy-keysdir/dummy.hostname.csr")
- def test_getSrvrCrtName(self):
- self.config.set('security', 'keysdir', '/dummy-keysdir')
- man = CertificateManager(self.config)
- res = man.getSrvrCrtName()
- self.assertEquals(res, "/dummy-keysdir/ca.crt")
- @patch("os.path.exists")
- @patch.object(security.CertificateManager, "loadSrvrCrt")
- @patch.object(security.CertificateManager, "getAgentKeyName")
- @patch.object(security.CertificateManager, "genAgentCrtReq")
- @patch.object(security.CertificateManager, "getAgentCrtName")
- @patch.object(security.CertificateManager, "reqSignCrt")
- def test_checkCertExists(self, reqSignCrt_mock, getAgentCrtName_mock,
- genAgentCrtReq_mock, getAgentKeyName_mock,
- loadSrvrCrt_mock, exists_mock):
- self.config.set('security', 'keysdir', '/dummy-keysdir')
- getAgentKeyName_mock.return_value = "dummy AgentKeyName"
- getAgentCrtName_mock.return_value = "dummy AgentCrtName"
- man = CertificateManager(self.config)
- # Case when all files exist
- exists_mock.side_effect = [True, True, True]
- man.checkCertExists()
- self.assertFalse(loadSrvrCrt_mock.called)
- self.assertFalse(genAgentCrtReq_mock.called)
- self.assertFalse(reqSignCrt_mock.called)
- # Absent server cert
- exists_mock.side_effect = [False, True, True]
- man.checkCertExists()
- self.assertTrue(loadSrvrCrt_mock.called)
- self.assertFalse(genAgentCrtReq_mock.called)
- self.assertFalse(reqSignCrt_mock.called)
- loadSrvrCrt_mock.reset_mock()
- # Absent agent key
- exists_mock.side_effect = [True, False, True]
- man.checkCertExists()
- self.assertFalse(loadSrvrCrt_mock.called)
- self.assertTrue(genAgentCrtReq_mock.called)
- self.assertFalse(reqSignCrt_mock.called)
- genAgentCrtReq_mock.reset_mock()
- # Absent agent cert
- exists_mock.side_effect = [True, True, False]
- man.checkCertExists()
- self.assertFalse(loadSrvrCrt_mock.called)
- self.assertFalse(genAgentCrtReq_mock.called)
- self.assertTrue(reqSignCrt_mock.called)
- reqSignCrt_mock.reset_mock()
- @patch('urllib2.urlopen')
- @patch.object(security.CertificateManager, "getSrvrCrtName")
- def test_loadSrvrCrt(self, getSrvrCrtName_mock, urlopen_mock):
- read_mock = MagicMock(create=True)
- read_mock.read.return_value = "dummy_cert"
- urlopen_mock.return_value = read_mock
- _, tmpoutfile = tempfile.mkstemp()
- getSrvrCrtName_mock.return_value = tmpoutfile
- man = CertificateManager(self.config)
- man.loadSrvrCrt()
- # Checking file contents
- saved = open(tmpoutfile, 'r').read()
- self.assertEqual(saved, read_mock.read.return_value)
- os.unlink(tmpoutfile)
- @patch("ambari_agent.hostname.hostname")
- @patch('__builtin__.open', create=True, autospec=True)
- @patch.dict('os.environ', {'DUMMY_PASSPHRASE': 'dummy-passphrase'})
- @patch('json.dumps')
- @patch('urllib2.Request')
- @patch('urllib2.urlopen')
- @patch('json.loads')
- def test_reqSignCrt(self, loads_mock, urlopen_mock, request_mock, dumps_mock, open_mock, hostname_mock):
- self.config.set('security', 'keysdir', '/dummy-keysdir')
- self.config.set('security', 'passphrase_env_var_name', 'DUMMY_PASSPHRASE')
- man = CertificateManager(self.config)
- hostname_mock.return_value = "dummy-hostname"
- open_mock.return_value.read.return_value = "dummy_request"
- urlopen_mock.return_value.read.return_value = "dummy_server_request"
- loads_mock.return_value = {
- 'result': 'OK',
- 'signedCa': 'dummy-crt'
- }
- # Test normal server interaction
- man.reqSignCrt()
- self.assertEqual(dumps_mock.call_args[0][0], {
- 'csr' : 'dummy_request',
- 'passphrase' : 'dummy-passphrase'
- })
- self.assertEqual(open_mock.return_value.write.call_args[0][0], 'dummy-crt')
- # Test negative server reply
- dumps_mock.reset_mock()
- open_mock.return_value.write.reset_mock()
- loads_mock.return_value = {
- 'result': 'FAIL',
- 'signedCa': 'fail-crt'
- }
- # If certificate signing failed, then exception must be raised
- try:
- man.reqSignCrt()
- self.fail()
- except ssl.SSLError:
- pass
- self.assertFalse(open_mock.return_value.write.called)
- # Test connection fail
- dumps_mock.reset_mock()
- open_mock.return_value.write.reset_mock()
- try:
- man.reqSignCrt()
- self.fail("Expected exception here")
- except Exception, err:
- # expected
- pass
- @patch("subprocess.Popen")
- @patch("subprocess.Popen.communicate")
- def test_genAgentCrtReq(self, communicate_mock, popen_mock):
- man = CertificateManager(self.config)
- p = MagicMock(spec=subprocess.Popen)
- p.communicate = communicate_mock
- popen_mock.return_value = p
- man.genAgentCrtReq()
- self.assertTrue(popen_mock.called)
- self.assertTrue(communicate_mock.called)
- @patch.object(security.CertificateManager, "checkCertExists")
- def test_initSecurity(self, checkCertExists_method):
- man = CertificateManager(self.config)
- man.initSecurity()
- self.assertTrue(checkCertExists_method.called)
|