datanode.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. """
  2. Licensed to the Apache Software Foundation (ASF) under one
  3. or more contributor license agreements. See the NOTICE file
  4. distributed with this work for additional information
  5. regarding copyright ownership. The ASF licenses this file
  6. to you under the Apache License, Version 2.0 (the
  7. "License"); you may not use this file except in compliance
  8. with the License. You may obtain a copy of the License at
  9. http://www.apache.org/licenses/LICENSE-2.0
  10. Unless required by applicable law or agreed to in writing, software
  11. distributed under the License is distributed on an "AS IS" BASIS,
  12. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. See the License for the specific language governing permissions and
  14. limitations under the License.
  15. """
  16. import datanode_upgrade
  17. from ambari_commons.constants import UPGRADE_TYPE_ROLLING
  18. from hdfs_datanode import datanode
  19. from resource_management import Script, Fail, shell, Logger
  20. from resource_management.libraries.script.script import Script
  21. from resource_management.libraries.functions import stack_select
  22. from resource_management.libraries.functions.stack_features import check_stack_feature
  23. from resource_management.libraries.functions import StackFeature
  24. from resource_management.libraries.functions import format
  25. from resource_management.libraries.functions.decorator import retry
  26. from resource_management.libraries.functions.security_commons import build_expectations, \
  27. cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, FILE_TYPE_XML
  28. from resource_management.core.logger import Logger
  29. from hdfs import hdfs, reconfig
  30. from ambari_commons.os_family_impl import OsFamilyImpl
  31. from ambari_commons import OSConst
  32. from utils import get_hdfs_binary
  33. from utils import get_dfsadmin_base_command
  34. class DataNode(Script):
  35. def get_hdfs_binary(self):
  36. """
  37. Get the name or path to the hdfs binary depending on the component name.
  38. """
  39. return get_hdfs_binary("hadoop-hdfs-datanode")
  40. def install(self, env):
  41. import params
  42. env.set_params(params)
  43. self.install_packages(env)
  44. def configure(self, env):
  45. import params
  46. env.set_params(params)
  47. hdfs("datanode")
  48. datanode(action="configure")
  49. def save_configs(self, env):
  50. import params
  51. env.set_params(params)
  52. hdfs("datanode")
  53. def reload_configs(self, env):
  54. import params
  55. env.set_params(params)
  56. Logger.info("RELOAD CONFIGS")
  57. reconfig("datanode", params.dfs_dn_ipc_address)
  58. def start(self, env, upgrade_type=None):
  59. import params
  60. env.set_params(params)
  61. self.configure(env)
  62. datanode(action="start")
  63. def stop(self, env, upgrade_type=None):
  64. import params
  65. env.set_params(params)
  66. # pre-upgrade steps shutdown the datanode, so there's no need to call
  67. hdfs_binary = self.get_hdfs_binary()
  68. if upgrade_type == UPGRADE_TYPE_ROLLING:
  69. stopped = datanode_upgrade.pre_rolling_upgrade_shutdown(hdfs_binary)
  70. if not stopped:
  71. datanode(action="stop")
  72. else:
  73. datanode(action="stop")
  74. # verify that the datanode is down
  75. self.check_datanode_shutdown(hdfs_binary)
  76. def status(self, env):
  77. import status_params
  78. env.set_params(status_params)
  79. datanode(action = "status")
  80. @retry(times=24, sleep_time=5, err_class=Fail)
  81. def check_datanode_shutdown(self, hdfs_binary):
  82. """
  83. Checks that a DataNode is down by running "hdfs dfsamin getDatanodeInfo"
  84. several times, pausing in between runs. Once the DataNode stops responding
  85. this method will return, otherwise it will raise a Fail(...) and retry
  86. automatically.
  87. The stack defaults for retrying for HDFS are also way too slow for this
  88. command; they are set to wait about 45 seconds between client retries. As
  89. a result, a single execution of dfsadmin will take 45 seconds to retry and
  90. the DataNode may be marked as dead, causing problems with HBase.
  91. https://issues.apache.org/jira/browse/HDFS-8510 tracks reducing the
  92. times for ipc.client.connect.retry.interval. In the meantime, override them
  93. here, but only for RU.
  94. :param hdfs_binary: name/path of the HDFS binary to use
  95. :return:
  96. """
  97. import params
  98. # override stock retry timeouts since after 30 seconds, the datanode is
  99. # marked as dead and can affect HBase during RU
  100. dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary)
  101. command = format('{dfsadmin_base_command} -D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo {dfs_dn_ipc_address}')
  102. is_datanode_deregistered = False
  103. try:
  104. shell.checked_call(command, user=params.hdfs_user, tries=1)
  105. except:
  106. is_datanode_deregistered = True
  107. if not is_datanode_deregistered:
  108. Logger.info("DataNode has not yet deregistered from the NameNode...")
  109. raise Fail('DataNode has not yet deregistered from the NameNode...')
  110. Logger.info("DataNode has successfully shutdown.")
  111. return True
  112. @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
  113. class DataNodeDefault(DataNode):
  114. def pre_upgrade_restart(self, env, upgrade_type=None):
  115. Logger.info("Executing DataNode Stack Upgrade pre-restart")
  116. import params
  117. env.set_params(params)
  118. if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version):
  119. stack_select.select_packages(params.version)
  120. def post_upgrade_restart(self, env, upgrade_type=None):
  121. Logger.info("Executing DataNode Stack Upgrade post-restart")
  122. import params
  123. env.set_params(params)
  124. hdfs_binary = self.get_hdfs_binary()
  125. # ensure the DataNode has started and rejoined the cluster
  126. datanode_upgrade.post_upgrade_check(hdfs_binary)
  127. def get_log_folder(self):
  128. import params
  129. return params.hdfs_log_dir
  130. def get_user(self):
  131. import params
  132. return params.hdfs_user
  133. def get_pid_files(self):
  134. import status_params
  135. return [status_params.datanode_pid_file]
  136. @OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
  137. class DataNodeWindows(DataNode):
  138. def install(self, env):
  139. import install_params
  140. self.install_packages(env)
  141. if __name__ == "__main__":
  142. DataNode().execute()