|
@@ -0,0 +1,157 @@
|
|
|
+#!/usr/bin/env python
|
|
|
+
|
|
|
+"""
|
|
|
+Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+or more contributor license agreements. See the NOTICE file
|
|
|
+distributed with this work for additional information
|
|
|
+regarding copyright ownership. The ASF licenses this file
|
|
|
+to you under the Apache License, Version 2.0 (the
|
|
|
+"License"); you may not use this file except in compliance
|
|
|
+with the License. You may obtain a copy of the License at
|
|
|
+
|
|
|
+ http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+
|
|
|
+Unless required by applicable law or agreed to in writing, software
|
|
|
+distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+See the License for the specific language governing permissions and
|
|
|
+limitations under the License.
|
|
|
+"""
|
|
|
+
|
|
|
+import logging
|
|
|
+import json
|
|
|
+import socket
|
|
|
+import urllib2
|
|
|
+import urllib
|
|
|
+
|
|
|
+from resource_management.libraries.functions.curl_krb_request import curl_krb_request
|
|
|
+from resource_management.core.environment import Environment
|
|
|
+
|
|
|
+CLUSTER_ENV_SECURITY = '{{cluster-env/security_enabled}}'
|
|
|
+HADOOP_ENV_HDFS_USER = '{{hadoop-env/hdfs_user}}'
|
|
|
+HADOOP_ENV_HDFS_USER_KEYTAB = '{{hadoop-env/hdfs_user_keytab}}'
|
|
|
+HADOOP_ENV_HDFS_PRINCIPAL_NAME = '{{hadoop-env/hdfs_principal_name}}'
|
|
|
+HDFS_SITE_DFS_NAMENODE_HTTP_ADDRESS = '{{hdfs-site/dfs.namenode.http-address}}'
|
|
|
+
|
|
|
+
|
|
|
+RESULT_STATE_OK = 'OK'
|
|
|
+RESULT_STATE_WARNING = 'WARNING'
|
|
|
+
|
|
|
+PXF_PORT = 51200
|
|
|
+
|
|
|
+BASE_URL = "http://localhost:" + str(PXF_PORT) + "/pxf/"
|
|
|
+
|
|
|
+logger = logging.getLogger('ambari_alerts')
|
|
|
+
|
|
|
+commonPXFHeaders = {
|
|
|
+ "X-GP-SEGMENT-COUNT": "1",
|
|
|
+ "X-GP-URL-PORT": PXF_PORT,
|
|
|
+ "X-GP-SEGMENT-ID": "-1",
|
|
|
+ "X-GP-HAS-FILTER": "0",
|
|
|
+ "Accept": "application/json",
|
|
|
+ "X-GP-ALIGNMENT": "8",
|
|
|
+ "X-GP-ATTRS": "0",
|
|
|
+ "X-GP-FORMAT": "TEXT",
|
|
|
+ "X-GP-URL-HOST": "localhost"
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+def get_tokens():
|
|
|
+ return (CLUSTER_ENV_SECURITY,
|
|
|
+ HADOOP_ENV_HDFS_USER,
|
|
|
+ HADOOP_ENV_HDFS_USER_KEYTAB,
|
|
|
+ HADOOP_ENV_HDFS_PRINCIPAL_NAME,
|
|
|
+ HDFS_SITE_DFS_NAMENODE_HTTP_ADDRESS)
|
|
|
+
|
|
|
+def _get_delegation_token(namenode_address, user, keytab, principal, kinit_path):
|
|
|
+ """
|
|
|
+ Gets the kerberos delegation token from name node
|
|
|
+ """
|
|
|
+ url = namenode_address + "/webhdfs/v1/?op=GETDELEGATIONTOKEN"
|
|
|
+ logger.info("Getting delegation token from {0} for PXF".format(url))
|
|
|
+ response, _, _ = curl_krb_request(Environment.get_instance().tmp_dir,
|
|
|
+ keytab,
|
|
|
+ principal,
|
|
|
+ url,
|
|
|
+ "get_delegation_token",
|
|
|
+ kinit_path,
|
|
|
+ False,
|
|
|
+ "Delegation Token",
|
|
|
+ user)
|
|
|
+ json_response = json.loads(response)
|
|
|
+ if json_response['Token'] and json_response['Token']['urlString']:
|
|
|
+ return json_response['Token']['urlString']
|
|
|
+
|
|
|
+ msg = "Unable to get delegation token for PXF"
|
|
|
+ logger.error(msg)
|
|
|
+ raise Exception(msg)
|
|
|
+
|
|
|
+def _makeHTTPCall(url, header={}, body=None):
|
|
|
+ # timeout in seconds
|
|
|
+ timeout = 10
|
|
|
+ socket.setdefaulttimeout(timeout)
|
|
|
+
|
|
|
+ try:
|
|
|
+ data = None
|
|
|
+ if body:
|
|
|
+ data = urllib.urlencode(body)
|
|
|
+ req = urllib2.Request(url, data, header)
|
|
|
+
|
|
|
+ response = urllib2.urlopen(req)
|
|
|
+ responseContent = response.read()
|
|
|
+ return responseContent
|
|
|
+ except urllib2.URLError as e:
|
|
|
+ if hasattr(e, 'reason'):
|
|
|
+ logger.error( 'Reason: ' + str(e.reason))
|
|
|
+ if hasattr(e, 'code'):
|
|
|
+ logger.error('Error code: ' + str(e.code))
|
|
|
+ raise e
|
|
|
+
|
|
|
+
|
|
|
+def _get_pxf_protocol_version():
|
|
|
+ """
|
|
|
+ Gets the pxf protocol version number
|
|
|
+ """
|
|
|
+ logger.info("Fetching PXF protocol version")
|
|
|
+ url = BASE_URL + "ProtocolVersion"
|
|
|
+ try:
|
|
|
+ response = _makeHTTPCall(url)
|
|
|
+ except Exception as e:
|
|
|
+ raise Exception("URL: " + url + " is not accessible. " + str(e.reason))
|
|
|
+
|
|
|
+ logger.info(response)
|
|
|
+ # Sample response: 'PXF protocol version v14'
|
|
|
+ if response:
|
|
|
+ import re
|
|
|
+ # Extract the v14 from the output
|
|
|
+ match = re.search('.*(v\d*).*', response)
|
|
|
+ if match:
|
|
|
+ return match.group(1)
|
|
|
+
|
|
|
+ raise Exception("version could not be found in response " + response)
|
|
|
+
|
|
|
+def execute(configurations={}, parameters={}, host_name=None):
|
|
|
+ try:
|
|
|
+ # Get delegation token if security is enabled
|
|
|
+ if CLUSTER_ENV_SECURITY in configurations and configurations[CLUSTER_ENV_SECURITY].lower() == "true":
|
|
|
+ namenode_address = configurations[HDFS_SITE_DFS_NAMENODE_HTTP_ADDRESS]
|
|
|
+
|
|
|
+ token = _get_delegation_token(namenode_address,
|
|
|
+ configurations[HADOOP_ENV_HDFS_USER],
|
|
|
+ configurations[HADOOP_ENV_HDFS_USER_KEYTAB],
|
|
|
+ configurations[HADOOP_ENV_HDFS_PRINCIPAL_NAME],
|
|
|
+ None)
|
|
|
+ commonPXFHeaders.update({"X-GP-TOKEN": token})
|
|
|
+
|
|
|
+ if _get_pxf_protocol_version().startswith("v"):
|
|
|
+ return (RESULT_STATE_OK, ['PXF is functional'])
|
|
|
+
|
|
|
+ message = "Unable to determine PXF version"
|
|
|
+ logger.exception(message)
|
|
|
+ raise Exception(message)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ message = 'PXF is not functional on host, {0}: {1}'.format(host_name, e)
|
|
|
+ logger.exception(message)
|
|
|
+ return (RESULT_STATE_WARNING, [message])
|
|
|
+
|