|
@@ -15,7 +15,8 @@
|
|
|
# See the License for the specific language governing permissions and
|
|
|
# limitations under the License.
|
|
|
|
|
|
-
|
|
|
+from StringIO import StringIO
|
|
|
+import gzip
|
|
|
import httplib
|
|
|
import urllib2
|
|
|
import socket
|
|
@@ -31,8 +32,9 @@ import platform
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
-GEN_AGENT_KEY = 'openssl req -new -newkey rsa:1024 -nodes -keyout "%(keysdir)s'+os.sep+'%(hostname)s.key" '\
|
|
|
- '-subj /OU=%(hostname)s/ -out "%(keysdir)s'+os.sep+'%(hostname)s.csr"'
|
|
|
+GEN_AGENT_KEY = 'openssl req -new -newkey rsa:1024 -nodes -keyout "%(keysdir)s' \
|
|
|
+ + os.sep + '%(hostname)s.key" -subj /OU=%(hostname)s/ ' \
|
|
|
+ '-out "%(keysdir)s' + os.sep + '%(hostname)s.csr"'
|
|
|
|
|
|
|
|
|
class VerifiedHTTPSConnection(httplib.HTTPSConnection):
|
|
@@ -44,9 +46,11 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection):
|
|
|
|
|
|
def connect(self):
|
|
|
self.two_way_ssl_required = self.config.isTwoWaySSLConnection()
|
|
|
- logger.debug("Server two-way SSL authentication required: %s" % str(self.two_way_ssl_required))
|
|
|
+ logger.debug("Server two-way SSL authentication required: %s" % str(
|
|
|
+ self.two_way_ssl_required))
|
|
|
if self.two_way_ssl_required is True:
|
|
|
- logger.info('Server require two-way SSL authentication. Use it instead of one-way...')
|
|
|
+ logger.info(
|
|
|
+ 'Server require two-way SSL authentication. Use it instead of one-way...')
|
|
|
|
|
|
if not self.two_way_ssl_required:
|
|
|
try:
|
|
@@ -56,8 +60,9 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection):
|
|
|
'turned off on the server.')
|
|
|
except (ssl.SSLError, AttributeError):
|
|
|
self.two_way_ssl_required = True
|
|
|
- logger.info('Insecure connection to https://' + self.host + ':' + self.port +
|
|
|
- '/ failed. Reconnecting using two-way SSL authentication..')
|
|
|
+ logger.info(
|
|
|
+ 'Insecure connection to https://' + self.host + ':' + self.port +
|
|
|
+ '/ failed. Reconnecting using two-way SSL authentication..')
|
|
|
|
|
|
if self.two_way_ssl_required:
|
|
|
self.certMan = CertificateManager(self.config)
|
|
@@ -70,21 +75,21 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection):
|
|
|
|
|
|
try:
|
|
|
self.sock = ssl.wrap_socket(sock,
|
|
|
- keyfile=agent_key,
|
|
|
- certfile=agent_crt,
|
|
|
- cert_reqs=ssl.CERT_REQUIRED,
|
|
|
- ca_certs=server_crt)
|
|
|
+ keyfile=agent_key,
|
|
|
+ certfile=agent_crt,
|
|
|
+ cert_reqs=ssl.CERT_REQUIRED,
|
|
|
+ ca_certs=server_crt)
|
|
|
logger.info('SSL connection established. Two-way SSL authentication '
|
|
|
'completed successfully.')
|
|
|
except ssl.SSLError as err:
|
|
|
logger.error('Two-way SSL authentication failed. Ensure that '
|
|
|
- 'server and agent certificates were signed by the same CA '
|
|
|
- 'and restart the agent. '
|
|
|
- '\nIn order to receive a new agent certificate, remove '
|
|
|
- 'existing certificate file from keys directory. As a '
|
|
|
- 'workaround you can turn off two-way SSL authentication in '
|
|
|
- 'server configuration(ambari.properties) '
|
|
|
- '\nExiting..')
|
|
|
+ 'server and agent certificates were signed by the same CA '
|
|
|
+ 'and restart the agent. '
|
|
|
+ '\nIn order to receive a new agent certificate, remove '
|
|
|
+ 'existing certificate file from keys directory. As a '
|
|
|
+ 'workaround you can turn off two-way SSL authentication in '
|
|
|
+ 'server configuration(ambari.properties) '
|
|
|
+ '\nExiting..')
|
|
|
raise err
|
|
|
|
|
|
def create_connection(self):
|
|
@@ -112,13 +117,15 @@ class CachedHTTPSConnection:
|
|
|
|
|
|
def connect(self):
|
|
|
if not self.connected:
|
|
|
- self.httpsconn = VerifiedHTTPSConnection(self.server, self.port, self.config)
|
|
|
+ self.httpsconn = VerifiedHTTPSConnection(self.server, self.port,
|
|
|
+ self.config)
|
|
|
self.httpsconn.connect()
|
|
|
self.connected = True
|
|
|
# possible exceptions are caught and processed in Controller
|
|
|
|
|
|
def forceClear(self):
|
|
|
- self.httpsconn = VerifiedHTTPSConnection(self.server, self.port, self.config)
|
|
|
+ self.httpsconn = VerifiedHTTPSConnection(self.server, self.port,
|
|
|
+ self.config)
|
|
|
self.connect()
|
|
|
|
|
|
def request(self, req):
|
|
@@ -127,6 +134,10 @@ class CachedHTTPSConnection:
|
|
|
self.httpsconn.request(req.get_method(), req.get_full_url(),
|
|
|
req.get_data(), req.headers)
|
|
|
response = self.httpsconn.getresponse()
|
|
|
+ # Ungzip if gzipped
|
|
|
+ if response.getheader('Content-Encoding') == 'gzip':
|
|
|
+ buf = StringIO(response.read())
|
|
|
+ response = gzip.GzipFile(fileobj=buf)
|
|
|
readResponse = response.read()
|
|
|
except Exception as ex:
|
|
|
# This exception is caught later in Controller
|
|
@@ -144,7 +155,7 @@ class CertificateManager():
|
|
|
self.keysdir = os.path.abspath(self.config.get('security', 'keysdir'))
|
|
|
self.server_crt = self.config.get('security', 'server_crt')
|
|
|
self.server_url = 'https://' + hostname.server_hostname(config) + ':' \
|
|
|
- + self.config.get('server', 'url_port')
|
|
|
+ + self.config.get('server', 'url_port')
|
|
|
|
|
|
def getAgentKeyName(self):
|
|
|
keysdir = os.path.abspath(self.config.get('security', 'keysdir'))
|
|
@@ -164,7 +175,8 @@ class CertificateManager():
|
|
|
|
|
|
def checkCertExists(self):
|
|
|
|
|
|
- s = os.path.abspath(self.config.get('security', 'keysdir')) + os.sep + "ca.crt"
|
|
|
+ s = os.path.abspath(
|
|
|
+ self.config.get('security', 'keysdir')) + os.sep + "ca.crt"
|
|
|
|
|
|
server_crt_exists = os.path.exists(s)
|
|
|
|
|
@@ -202,18 +214,20 @@ class CertificateManager():
|
|
|
srvr_crt_f.write(response)
|
|
|
|
|
|
def reqSignCrt(self):
|
|
|
- sign_crt_req_url = self.server_url + '/certs/' + hostname.hostname(self.config)
|
|
|
+ sign_crt_req_url = self.server_url + '/certs/' + hostname.hostname(
|
|
|
+ self.config)
|
|
|
agent_crt_req_f = open(self.getAgentCrtReqName())
|
|
|
agent_crt_req_content = agent_crt_req_f.read()
|
|
|
passphrase_env_var = self.config.get('security', 'passphrase_env_var_name')
|
|
|
passphrase = os.environ[passphrase_env_var]
|
|
|
register_data = {'csr': agent_crt_req_content,
|
|
|
- 'passphrase': passphrase}
|
|
|
+ 'passphrase': passphrase}
|
|
|
data = json.dumps(register_data)
|
|
|
proxy_handler = urllib2.ProxyHandler({})
|
|
|
opener = urllib2.build_opener(proxy_handler)
|
|
|
urllib2.install_opener(opener)
|
|
|
- req = urllib2.Request(sign_crt_req_url, data, {'Content-Type': 'application/json'})
|
|
|
+ req = urllib2.Request(sign_crt_req_url, data,
|
|
|
+ {'Content-Type': 'application/json'})
|
|
|
f = urllib2.urlopen(req)
|
|
|
response = f.read()
|
|
|
f.close()
|
|
@@ -239,14 +253,16 @@ class CertificateManager():
|
|
|
raise ssl.SSLError
|
|
|
|
|
|
def genAgentCrtReq(self):
|
|
|
- generate_script = GEN_AGENT_KEY % {'hostname': hostname.hostname(self.config),
|
|
|
- 'keysdir' : os.path.abspath(self.config.get('security', 'keysdir'))}
|
|
|
+ generate_script = GEN_AGENT_KEY % {
|
|
|
+ 'hostname': hostname.hostname(self.config),
|
|
|
+ 'keysdir': os.path.abspath(self.config.get('security', 'keysdir'))}
|
|
|
logger.info(generate_script)
|
|
|
if platform.system() == 'Windows':
|
|
|
p = subprocess.Popen(generate_script, stdout=subprocess.PIPE)
|
|
|
p.communicate()
|
|
|
else:
|
|
|
- p = subprocess.Popen([generate_script], shell=True, stdout=subprocess.PIPE)
|
|
|
+ p = subprocess.Popen([generate_script], shell=True,
|
|
|
+ stdout=subprocess.PIPE)
|
|
|
p.communicate()
|
|
|
|
|
|
def initSecurity(self):
|