|
@@ -16,6 +16,7 @@ See the License for the specific language governing permissions and
|
|
|
limitations under the License.
|
|
|
|
|
|
"""
|
|
|
+import os.path
|
|
|
|
|
|
from resource_management import *
|
|
|
from resource_management.core.exceptions import ComponentIsNotRunning
|
|
@@ -152,8 +153,8 @@ def create_hdfs_directories(check):
|
|
|
def format_namenode(force=None):
|
|
|
import params
|
|
|
|
|
|
- old_mark_dir = params.namenode_formatted_old_mark_dir
|
|
|
- mark_dir = params.namenode_formatted_mark_dir
|
|
|
+ old_mark_dir = params.namenode_formatted_old_mark_dirs
|
|
|
+ mark_dir = params.namenode_formatted_mark_dirs
|
|
|
dfs_name_dir = params.dfs_name_dir
|
|
|
hdfs_user = params.hdfs_user
|
|
|
hadoop_conf_dir = params.hadoop_conf_dir
|
|
@@ -165,38 +166,89 @@ def format_namenode(force=None):
|
|
|
bin_dir=params.hadoop_bin_dir,
|
|
|
conf_dir=hadoop_conf_dir)
|
|
|
else:
|
|
|
- File(format("{tmp_dir}/checkForFormat.sh"),
|
|
|
- content=StaticFile("checkForFormat.sh"),
|
|
|
- mode=0755)
|
|
|
- Execute(format(
|
|
|
- "{tmp_dir}/checkForFormat.sh {hdfs_user} {hadoop_conf_dir} "
|
|
|
- "{hadoop_bin_dir} {old_mark_dir} {mark_dir} {dfs_name_dir}"),
|
|
|
- not_if=format("test -d {old_mark_dir} || test -d {mark_dir}"),
|
|
|
- path="/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin"
|
|
|
- )
|
|
|
-
|
|
|
- Directory(mark_dir,
|
|
|
- recursive = True
|
|
|
- )
|
|
|
+ if not is_namenode_formatted(params):
|
|
|
+ Execute(format(
|
|
|
+ 'sudo su {hdfs_user} - -s /bin/bash -c "export PATH=$PATH:{hadoop_bin_dir} ; yes Y | hdfs --config {hadoop_conf_dir} namenode -format"'),
|
|
|
+ path="/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin",
|
|
|
+ )
|
|
|
+ for m_dir in mark_dir:
|
|
|
+ Directory(m_dir,
|
|
|
+ recursive = True
|
|
|
+ )
|
|
|
else:
|
|
|
if params.dfs_ha_namenode_active is not None:
|
|
|
if params.hostname == params.dfs_ha_namenode_active:
|
|
|
# check and run the format command in the HA deployment scenario
|
|
|
# only format the "active" namenode in an HA deployment
|
|
|
- File(format("{tmp_dir}/checkForFormat.sh"),
|
|
|
- content=StaticFile("checkForFormat.sh"),
|
|
|
- mode=0755)
|
|
|
Execute(format(
|
|
|
- "{tmp_dir}/checkForFormat.sh {hdfs_user} {hadoop_conf_dir} "
|
|
|
- "{hadoop_bin_dir} {old_mark_dir} {mark_dir} {dfs_name_dir}"),
|
|
|
- not_if=format("test -d {old_mark_dir} || test -d {mark_dir}"),
|
|
|
- path="/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin"
|
|
|
+ 'sudo su {hdfs_user} - -s /bin/bash -c "export PATH=$PATH:{hadoop_bin_dir} ; yes Y | hdfs --config {hadoop_conf_dir} namenode -format"'),
|
|
|
+ path="/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin",
|
|
|
)
|
|
|
- Directory(mark_dir,
|
|
|
- recursive=True
|
|
|
+ for m_dir in mark_dir:
|
|
|
+ Directory(m_dir,
|
|
|
+ recursive = True
|
|
|
)
|
|
|
|
|
|
|
|
|
+def is_namenode_formatted(params):
|
|
|
+ old_mark_dirs = params.namenode_formatted_old_mark_dirs
|
|
|
+ mark_dirs = params.namenode_formatted_mark_dirs
|
|
|
+ nn_name_dirs = params.dfs_name_dir.split(',')
|
|
|
+ marked = False
|
|
|
+ # Check if name directories have been marked as formatted
|
|
|
+ for mark_dir in mark_dirs:
|
|
|
+ if os.path.isdir(mark_dir):
|
|
|
+ marked = True
|
|
|
+ print format("{mark_dir} exists. Namenode DFS already formatted")
|
|
|
+
|
|
|
+ # Ensure that all mark dirs created for all name directories
|
|
|
+ if marked:
|
|
|
+ for mark_dir in mark_dirs:
|
|
|
+ Directory(mark_dir,
|
|
|
+ recursive = True
|
|
|
+ )
|
|
|
+ return marked
|
|
|
+
|
|
|
+ # Move all old format markers to new place
|
|
|
+ for old_mark_dir in old_mark_dirs:
|
|
|
+ if os.path.isdir(old_mark_dir):
|
|
|
+ for mark_dir in mark_dirs:
|
|
|
+ Execute(format(
|
|
|
+ "sudo cp -ar {old_mark_dir} {mark_dir}"),
|
|
|
+ path="/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin"
|
|
|
+ )
|
|
|
+ marked = True
|
|
|
+ Execute(format(
|
|
|
+ "sudo rm -rf {old_mark_dir}"),
|
|
|
+ path="/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin"
|
|
|
+ )
|
|
|
+ elif os.path.isfile(old_mark_dir):
|
|
|
+ for mark_dir in mark_dirs:
|
|
|
+ Execute(format(
|
|
|
+ "sudo mkdir -p ${mark_dir}"),
|
|
|
+ path="/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin"
|
|
|
+ )
|
|
|
+ Execute(format(
|
|
|
+ "sudo rm -f {old_mark_dir}"),
|
|
|
+ path="/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin"
|
|
|
+ )
|
|
|
+ marked = True
|
|
|
+
|
|
|
+ # Check if name dirs are not empty
|
|
|
+ for name_dir in nn_name_dirs:
|
|
|
+ try:
|
|
|
+ Execute(format(
|
|
|
+ "sudo ls {name_dir} | wc -l | grep -q ^0$"),
|
|
|
+ path="/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin"
|
|
|
+ )
|
|
|
+ marked = False
|
|
|
+ except Exception:
|
|
|
+ marked = True
|
|
|
+ print format("ERROR: Namenode directory(s) is non empty. Will not format the namenode. List of non-empty namenode dirs {nn_name_dirs}")
|
|
|
+ break
|
|
|
+
|
|
|
+ return marked
|
|
|
+
|
|
|
def decommission():
|
|
|
import params
|
|
|
|