|
@@ -20,9 +20,9 @@ limitations under the License.
|
|
|
|
|
|
import sys
|
|
|
from resource_management import *
|
|
|
-
|
|
|
from hive import hive
|
|
|
from hive_service import hive_service
|
|
|
+import os
|
|
|
|
|
|
class HiveServer(Script):
|
|
|
|
|
@@ -39,6 +39,7 @@ class HiveServer(Script):
|
|
|
import params
|
|
|
env.set_params(params)
|
|
|
self.configure(env) # FOR SECURITY
|
|
|
+ self.install_tez_jars(params) # Put tez jars in hdfs
|
|
|
hive_service( 'hiveserver2',
|
|
|
action = 'start'
|
|
|
)
|
|
@@ -51,7 +52,6 @@ class HiveServer(Script):
|
|
|
action = 'stop'
|
|
|
)
|
|
|
|
|
|
-
|
|
|
def status(self, env):
|
|
|
import status_params
|
|
|
env.set_params(status_params)
|
|
@@ -59,5 +59,126 @@ class HiveServer(Script):
|
|
|
# Recursively check all existing gmetad pid files
|
|
|
check_process_status(pid_file)
|
|
|
|
|
|
+ def install_tez_jars(self, params):
|
|
|
+ destination_hdfs_dirs = get_tez_hdfs_dir_paths(params.tez_lib_uris)
|
|
|
+
|
|
|
+ # If tez libraries are to be stored in hdfs
|
|
|
+ if destination_hdfs_dirs:
|
|
|
+ for hdfs_dir in destination_hdfs_dirs:
|
|
|
+ params.HdfsDirectory(hdfs_dir,
|
|
|
+ action="create_delayed",
|
|
|
+ owner=params.tez_user,
|
|
|
+ mode=0755
|
|
|
+ )
|
|
|
+ pass
|
|
|
+ params.HdfsDirectory(None, action="create")
|
|
|
+
|
|
|
+ if params.security_enabled:
|
|
|
+ kinit_if_needed = format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_user};")
|
|
|
+ else:
|
|
|
+ kinit_if_needed = ""
|
|
|
+
|
|
|
+ if kinit_if_needed:
|
|
|
+ Execute(kinit_if_needed,
|
|
|
+ user=params.tez_user,
|
|
|
+ path='/bin'
|
|
|
+ )
|
|
|
+
|
|
|
+ app_dir_path = None
|
|
|
+ lib_dir_path = None
|
|
|
+
|
|
|
+ if len(destination_hdfs_dirs) > 1:
|
|
|
+ for path in destination_hdfs_dirs:
|
|
|
+ if 'lib' in path:
|
|
|
+ lib_dir_path = path
|
|
|
+ else:
|
|
|
+ app_dir_path = path
|
|
|
+ pass
|
|
|
+ pass
|
|
|
+ pass
|
|
|
+
|
|
|
+ if app_dir_path:
|
|
|
+ copyFromLocal(path=params.tez_local_api_jars,
|
|
|
+ mode=0655,
|
|
|
+ owner=params.tez_user,
|
|
|
+ dest_dir=app_dir_path,
|
|
|
+ kinnit_if_needed=kinit_if_needed,
|
|
|
+ stub_path=params.tez_stub_path
|
|
|
+ )
|
|
|
+ pass
|
|
|
+
|
|
|
+ if lib_dir_path:
|
|
|
+ copyFromLocal(path=params.tez_local_lib_jars,
|
|
|
+ mode=0655,
|
|
|
+ owner=params.tez_user,
|
|
|
+ dest_dir=lib_dir_path,
|
|
|
+ kinnit_if_needed=kinit_if_needed,
|
|
|
+ stub_path=params.tez_stub_path
|
|
|
+ )
|
|
|
+ pass
|
|
|
+
|
|
|
+ ExecuteHadoop(format('dfs -touchz {tez_stub_path}'),
|
|
|
+ user = params.tez_user,
|
|
|
+ conf_dir=params.hadoop_conf_dir
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+def get_tez_hdfs_dir_paths(tez_lib_uris = None):
|
|
|
+ hdfs_path_prefix = 'hdfs://'
|
|
|
+ lib_dir_paths = []
|
|
|
+ if tez_lib_uris and tez_lib_uris.strip().find(hdfs_path_prefix, 0) != -1:
|
|
|
+ dir_paths = tez_lib_uris.split(',')
|
|
|
+ for path in dir_paths:
|
|
|
+ lib_dir_path = path.replace(hdfs_path_prefix, '')
|
|
|
+ lib_dir_path = lib_dir_path if lib_dir_path.endswith(os.sep) else lib_dir_path + os.sep
|
|
|
+ lib_dir_paths.append(lib_dir_path)
|
|
|
+ pass
|
|
|
+ pass
|
|
|
+
|
|
|
+ return lib_dir_paths
|
|
|
+
|
|
|
+
|
|
|
+def copyFromLocal(path=None, owner=None, group=None, mode=None,
|
|
|
+ dest_dir=None, kinnit_if_needed="", stub_path=None):
|
|
|
+ import params
|
|
|
+
|
|
|
+ if not stub_path:
|
|
|
+ stub_path = params.tez_stub_path
|
|
|
+ pass
|
|
|
+
|
|
|
+ copy_cmd = format("fs -copyFromLocal {path} {dest_dir}")
|
|
|
+ unless_cmd = format("{kinnit_if_needed} hadoop fs -ls {stub_path} >/dev/null 2>&1")
|
|
|
+
|
|
|
+ ExecuteHadoop(copy_cmd,
|
|
|
+ not_if=unless_cmd,
|
|
|
+ user=owner,
|
|
|
+ conf_dir=params.hadoop_conf_dir,
|
|
|
+ ignore_failures = True)
|
|
|
+
|
|
|
+ if not owner:
|
|
|
+ chown = None
|
|
|
+ else:
|
|
|
+ if not group:
|
|
|
+ chown = owner
|
|
|
+ else:
|
|
|
+ chown = format('{owner}:{group}')
|
|
|
+
|
|
|
+ if not chown:
|
|
|
+ chown_cmd = format("fs -chown {chown} {dest_dir}")
|
|
|
+
|
|
|
+ ExecuteHadoop(copy_cmd,
|
|
|
+ user=owner,
|
|
|
+ conf_dir=params.hadoop_conf_dir)
|
|
|
+ pass
|
|
|
+
|
|
|
+ if not mode:
|
|
|
+ chmod_cmd = format('fs -chmod {mode} {dest_dir}')
|
|
|
+
|
|
|
+ ExecuteHadoop(chmod_cmd,
|
|
|
+ user=owner,
|
|
|
+ conf_dir=params.hadoop_conf_dir)
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
if __name__ == "__main__":
|
|
|
HiveServer().execute()
|