123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- #!/usr/bin/env python
- # Licensed to the Apache Software Foundation (ASF) under one or more
- # contributor license agreements. See the NOTICE file distributed with
- # this work for additional information regarding copyright ownership.
- # The ASF licenses this file to You under the Apache License, Version 2.0
- # (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import httplib
- import urllib2
- import socket
- import ssl
- import os
- import logging
- import subprocess
- import json
- import pprint
- import traceback
- import hostname
- import platform
- logger = logging.getLogger()
- GEN_AGENT_KEY = 'openssl req -new -newkey rsa:1024 -nodes -keyout "%(keysdir)s'+os.sep+'%(hostname)s.key" '\
- '-subj /OU=%(hostname)s/ -out "%(keysdir)s'+os.sep+'%(hostname)s.csr"'
- class VerifiedHTTPSConnection(httplib.HTTPSConnection):
- """ Connecting using ssl wrapped sockets """
- def __init__(self, host, port=None, config=None):
- httplib.HTTPSConnection.__init__(self, host, port=port)
- self.two_way_ssl_required = False
- self.config = config
- def connect(self):
- self.two_way_ssl_required = self.config.isTwoWaySSLConnection()
- logger.debug("Server two-way SSL authentication required: %s" % str(self.two_way_ssl_required))
- if self.two_way_ssl_required is True:
- logger.info('Server require two-way SSL authentication. Use it instead of one-way...')
- if not self.two_way_ssl_required:
- try:
- sock = self.create_connection()
- self.sock = ssl.wrap_socket(sock, cert_reqs=ssl.CERT_NONE)
- logger.info('SSL connection established. Two-way SSL authentication is '
- 'turned off on the server.')
- except (ssl.SSLError, AttributeError):
- self.two_way_ssl_required = True
- logger.info('Insecure connection to https://' + self.host + ':' + self.port +
- '/ failed. Reconnecting using two-way SSL authentication..')
- if self.two_way_ssl_required:
- self.certMan = CertificateManager(self.config)
- self.certMan.initSecurity()
- agent_key = self.certMan.getAgentKeyName()
- agent_crt = self.certMan.getAgentCrtName()
- server_crt = self.certMan.getSrvrCrtName()
- sock = self.create_connection()
- try:
- self.sock = ssl.wrap_socket(sock,
- keyfile=agent_key,
- certfile=agent_crt,
- cert_reqs=ssl.CERT_REQUIRED,
- ca_certs=server_crt)
- logger.info('SSL connection established. Two-way SSL authentication '
- 'completed successfully.')
- except ssl.SSLError as err:
- logger.error('Two-way SSL authentication failed. Ensure that '
- 'server and agent certificates were signed by the same CA '
- 'and restart the agent. '
- '\nIn order to receive a new agent certificate, remove '
- 'existing certificate file from keys directory. As a '
- 'workaround you can turn off two-way SSL authentication in '
- 'server configuration(ambari.properties) '
- '\nExiting..')
- raise err
- def create_connection(self):
- if self.sock:
- self.sock.close()
- logger.info("SSL Connect being called.. connecting to the server")
- sock = socket.create_connection((self.host, self.port), 60)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
- if self._tunnel_host:
- self.sock = sock
- self._tunnel()
- return sock
- class CachedHTTPSConnection:
- """ Caches a ssl socket and uses a single https connection to the server. """
- def __init__(self, config):
- self.connected = False
- self.config = config
- self.server = hostname.server_hostname(config)
- self.port = config.get('server', 'secured_url_port')
- self.connect()
- def connect(self):
- if not self.connected:
- self.httpsconn = VerifiedHTTPSConnection(self.server, self.port, self.config)
- self.httpsconn.connect()
- self.connected = True
- # possible exceptions are caught and processed in Controller
- def forceClear(self):
- self.httpsconn = VerifiedHTTPSConnection(self.server, self.port, self.config)
- self.connect()
- def request(self, req):
- self.connect()
- try:
- self.httpsconn.request(req.get_method(), req.get_full_url(),
- req.get_data(), req.headers)
- response = self.httpsconn.getresponse()
- readResponse = response.read()
- except Exception as ex:
- # This exception is caught later in Controller
- logger.debug("Error in sending/receving data from the server " +
- traceback.format_exc())
- logger.info("Encountered communication error. Details: " + repr(ex))
- self.connected = False
- raise IOError("Error occured during connecting to the server: " + str(ex))
- return readResponse
- class CertificateManager():
- def __init__(self, config):
- self.config = config
- self.keysdir = os.path.abspath(self.config.get('security', 'keysdir'))
- self.server_crt = self.config.get('security', 'server_crt')
- self.server_url = 'https://' + hostname.server_hostname(config) + ':' \
- + self.config.get('server', 'url_port')
- def getAgentKeyName(self):
- keysdir = os.path.abspath(self.config.get('security', 'keysdir'))
- return keysdir + os.sep + hostname.hostname(self.config) + ".key"
- def getAgentCrtName(self):
- keysdir = os.path.abspath(self.config.get('security', 'keysdir'))
- return keysdir + os.sep + hostname.hostname(self.config) + ".crt"
- def getAgentCrtReqName(self):
- keysdir = os.path.abspath(self.config.get('security', 'keysdir'))
- return keysdir + os.sep + hostname.hostname(self.config) + ".csr"
- def getSrvrCrtName(self):
- keysdir = os.path.abspath(self.config.get('security', 'keysdir'))
- return keysdir + os.sep + "ca.crt"
- def checkCertExists(self):
- s = os.path.abspath(self.config.get('security', 'keysdir')) + os.sep + "ca.crt"
- server_crt_exists = os.path.exists(s)
- if not server_crt_exists:
- logger.info("Server certicate not exists, downloading")
- self.loadSrvrCrt()
- else:
- logger.info("Server certicate exists, ok")
- agent_key_exists = os.path.exists(self.getAgentKeyName())
- if not agent_key_exists:
- logger.info("Agent key not exists, generating request")
- self.genAgentCrtReq()
- else:
- logger.info("Agent key exists, ok")
- agent_crt_exists = os.path.exists(self.getAgentCrtName())
- if not agent_crt_exists:
- logger.info("Agent certificate not exists, sending sign request")
- self.reqSignCrt()
- else:
- logger.info("Agent certificate exists, ok")
- def loadSrvrCrt(self):
- get_ca_url = self.server_url + '/cert/ca/'
- logger.info("Downloading server cert from " + get_ca_url)
- proxy_handler = urllib2.ProxyHandler({})
- opener = urllib2.build_opener(proxy_handler)
- stream = opener.open(get_ca_url)
- response = stream.read()
- stream.close()
- srvr_crt_f = open(self.getSrvrCrtName(), 'w+')
- srvr_crt_f.write(response)
- def reqSignCrt(self):
- sign_crt_req_url = self.server_url + '/certs/' + hostname.hostname(self.config)
- agent_crt_req_f = open(self.getAgentCrtReqName())
- agent_crt_req_content = agent_crt_req_f.read()
- passphrase_env_var = self.config.get('security', 'passphrase_env_var_name')
- passphrase = os.environ[passphrase_env_var]
- register_data = {'csr': agent_crt_req_content,
- 'passphrase': passphrase}
- data = json.dumps(register_data)
- proxy_handler = urllib2.ProxyHandler({})
- opener = urllib2.build_opener(proxy_handler)
- urllib2.install_opener(opener)
- req = urllib2.Request(sign_crt_req_url, data, {'Content-Type': 'application/json'})
- f = urllib2.urlopen(req)
- response = f.read()
- f.close()
- try:
- data = json.loads(response)
- logger.debug("Sign response from Server: \n" + pprint.pformat(data))
- except Exception:
- logger.warn("Malformed response! data: " + str(data))
- data = {'result': 'ERROR'}
- result = data['result']
- if result == 'OK':
- agentCrtContent = data['signedCa']
- agentCrtF = open(self.getAgentCrtName(), "w")
- agentCrtF.write(agentCrtContent)
- else:
- # Possible exception is catched higher at Controller
- logger.error('Certificate signing failed.'
- '\nIn order to receive a new agent'
- ' certificate, remove existing certificate file from keys '
- 'directory. As a workaround you can turn off two-way SSL '
- 'authentication in server configuration(ambari.properties) '
- '\nExiting..')
- raise ssl.SSLError
- def genAgentCrtReq(self):
- generate_script = GEN_AGENT_KEY % {'hostname': hostname.hostname(self.config),
- 'keysdir' : os.path.abspath(self.config.get('security', 'keysdir'))}
- logger.info(generate_script)
- if platform.system() == 'Windows':
- p = subprocess.Popen(generate_script, stdout=subprocess.PIPE)
- p.communicate()
- else:
- p = subprocess.Popen([generate_script], shell=True, stdout=subprocess.PIPE)
- p.communicate()
- def initSecurity(self):
- self.checkCertExists()
|