Преглед изворни кода

AMBARI-9163. Intermittent Preparing NAMENODE fails during RU due to JOURNALNODE quorum not established (alejandro)

Alejandro Fernandez пре 10 година
родитељ
комит
fcce59e8ca

+ 9 - 0
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/metainfo.xml

@@ -124,6 +124,15 @@
             <scriptType>PYTHON</scriptType>
             <timeout>1200</timeout>
           </commandScript>
+          <dependencies>
+            <dependency>
+              <name>HDFS/HDFS_CLIENT</name>
+              <scope>host</scope>
+              <auto-deploy>
+                <enabled>true</enabled>
+              </auto-deploy>
+            </dependency>
+          </dependencies>
         </component>
 
         <component>

+ 7 - 0
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py

@@ -27,6 +27,7 @@ from resource_management.libraries.functions.security_commons import build_expec
 
 from utils import service
 from hdfs import hdfs
+import journalnode_upgrade
 
 
 class JournalNode(Script):
@@ -64,6 +65,12 @@ class JournalNode(Script):
       create_log_dir=True
     )
 
+  def post_rolling_restart(self, env):
+    Logger.info("Executing Rolling Upgrade post-restart")
+    import params
+    env.set_params(params)
+    journalnode_upgrade.post_upgrade_check()
+
   def stop(self, env, rolling_restart=False):
     import params
 

+ 147 - 0
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode_upgrade.py

@@ -0,0 +1,147 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
+
+import time
+import json
+
+from resource_management.core.logger import Logger
+from resource_management.core.resources.system import Execute
+from resource_management.libraries.functions.format import format
+from resource_management.libraries.functions.default import default
+from resource_management.core.exceptions import Fail
+from utils import get_jmx_data, get_port
+
+
+
+def post_upgrade_check():
+  """
+  Ensure all journal nodes are up and quorum is established
+  :return:
+  """
+  import params
+  Logger.info("Ensuring Journalnode quorum is established")
+
+  time.sleep(5)
+  hdfs_roll_edits()
+  time.sleep(5)
+
+  all_journal_node_hosts = default("/clusterHostInfo/journalnode_hosts", [])
+
+  if len(all_journal_node_hosts) < 3:
+    raise Fail("Need at least 3 Journalnodes to maintain a quorum")
+
+  # TODO, test with HTTPS
+  policy = default("/configurations/hdfs-site/dfs.http.policy", None)
+  if not policy:
+    raise Fail("Could not retrieve dfs.http.policy")
+  encrypted = policy.upper == "HTTPS_ONLY"
+
+  nn_address = default("/configurations/hdfs-site/dfs.namenode.https-address", None) if encrypted else \
+    default("/configurations/hdfs-site/dfs.namenode.http-address", None)
+
+  if not nn_address:
+    raise Fail("Could not retrieve dfs.namenode.http(s)-address for policy %s" % str(policy))
+
+  nn_data = get_jmx_data(nn_address, 'org.apache.hadoop.hdfs.server.namenode.FSNamesystem', 'JournalTransactionInfo',
+                         encrypted)
+  if not nn_data:
+    raise Fail("Could not retrieve JournalTransactionInfo from JMX")
+
+  try:
+    last_txn_id = int(nn_data['LastAppliedOrWrittenTxId'])
+    success = ensure_jns_have_new_txn(all_journal_node_hosts, last_txn_id)
+
+    if not success:
+      raise Fail("Could not ensure that all Journal nodes have a new log transaction id")
+  except KeyError:
+    raise Fail("JournalTransactionInfo does not have key LastAppliedOrWrittenTxId from JMX info")
+
+
+def hdfs_roll_edits():
+  """
+  HDFS_CLIENT needs to be a dependency of JOURNALNODE
+  Roll the logs so that Namenode will be able to connect to the Journalnode.
+  """
+  import params
+
+  # TODO, this will be to be doc'ed since existing HDP 2.2 clusters will needs HDFS_CLIENT on all JOURNALNODE hosts
+  if params.security_enabled:
+    Execute(params.dn_kinit_cmd, user=params.hdfs_user)
+
+  command = 'hdfs dfsadmin -rollEdits'
+  Execute(command, user=params.hdfs_user, tries=1)
+
+
+def ensure_jns_have_new_txn(nodes, last_txn_id):
+  """
+  :param nodes: List of Journalnodes
+  :param last_txn_id: Integer of last transaction id
+  :return: Return true on success, false otherwise
+  """
+  import params
+
+  num_of_jns = len(nodes)
+  actual_txn_ids = {}
+  jns_updated = 0
+  protocol = 'http'
+
+  journal_node_address = default("/configurations/hdfs-site/dfs.journalnode.https-address", None)
+  if journal_node_address:
+    protocol = "https"
+  else:
+    journal_node_address = default("/configurations/hdfs-site/dfs.journalnode.http-address", None)
+
+  if not journal_node_address:
+    raise Fail("Could not retrieve Journal node address")
+
+  jn_port = get_port(journal_node_address)    # default is 8480, encrypted is 8481
+  if not jn_port:
+    raise Fail("Could not retrieve Journalnode port")
+
+  time_out_secs = 3 * 60
+  step_time_secs = 10
+  iterations = int(time_out_secs/step_time_secs)
+
+  Logger.info("Checking if all Journalnodes are updated.")
+  for i in range(iterations):
+    Logger.info('Try %d out of %d' % (i+1, iterations))
+    for node in nodes:
+      # if all JNS are updated break
+      if jns_updated == num_of_jns:
+        Logger.info("All journal nodes are updated")
+        return True
+
+      # JN already meets condition, skip it
+      if node in actual_txn_ids and actual_txn_ids[node] and actual_txn_ids[node] >= last_txn_id:
+        continue
+
+      url = '%s://%s:%s' % (protocol, node, jn_port)
+      data = get_jmx_data(url, 'Journal-', 'LastWrittenTxId')
+      if data:
+        actual_txn_ids[node] = int(data)
+        if actual_txn_ids[node] >= last_txn_id:
+          Logger.info("Journalnode %s has a higher transaction id: %s" + str(data))
+          jns_updated += 1
+        else:
+          Logger.info("Journalnode %s is still on transaction id: %s" + str(data))
+
+    Logger.info("Sleeping for %d secs" % step_time_secs)
+    time.sleep(step_time_secs)
+
+  return jns_updated == num_of_jns

+ 2 - 3
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode_upgrade.py

@@ -23,7 +23,6 @@ from resource_management.core.resources.system import Execute
 from resource_management.libraries.functions.format import format
 from resource_management.core.shell import call
 from resource_management.core.exceptions import Fail
-from resource_management.libraries.functions.decorator import retry
 
 
 class SAFEMODE:
@@ -34,7 +33,6 @@ class SAFEMODE:
 safemode_to_instruction = {SAFEMODE.ON: "enter",
                            SAFEMODE.OFF: "leave"}
 
-@retry(times=3, sleep_time=6, err_class=Fail)
 def reach_safemode_state(user, safemode_state, in_ha):
   """
   Enter or leave safemode for the Namenode.
@@ -68,6 +66,7 @@ def reach_safemode_state(user, safemode_state, in_ha):
 def prepare_rolling_upgrade():
   """
   Rolling Upgrade for HDFS Namenode requires the following.
+  0. Namenode must be up
   1. Leave safemode if the safemode status is not OFF
   2. Execute a rolling upgrade "prepare"
   3. Execute a rolling upgrade "query"
@@ -83,7 +82,7 @@ def prepare_rolling_upgrade():
 
   safemode_transition_successful = reach_safemode_state(user, SAFEMODE.OFF, True)
   if not safemode_transition_successful:
-    raise Fail("Could leave safemode")
+    raise Fail("Could not transition to safemode state %s. Please check logs to make sure namenode is up." % str(SAFEMODE.OFF))
 
   prepare = "hdfs dfsadmin -rollingUpgrade prepare"
   query = "hdfs dfsadmin -rollingUpgrade query"

+ 37 - 2
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py

@@ -18,6 +18,8 @@ limitations under the License.
 """
 import os
 import re
+import urllib2
+import json
 
 from resource_management import *
 from resource_management.libraries.functions.format import format
@@ -83,7 +85,7 @@ def failover_namenode():
     else:
       Execute(check_standby_cmd,
               user=params.hdfs_user,
-              tries=30,
+              tries=50,
               try_sleep=6,
               logoutput=True)
 
@@ -216,6 +218,39 @@ def service(action=None, name=None, user=None, options="", create_pid_dir=False,
     )
 
 
+def get_jmx_data(nn_address, modeler_type, metric, encrypted=False):
+  """
+  :param nn_address: Namenode Address, e.g., host:port, ** MAY ** be preceded with "http://" or "https://" already.
+  If not preceded, will use the encrypted param to determine.
+  :param modeler_type: Modeler type to query using startswith function
+  :param metric: Metric to return
+  :return: Return an object representation of the metric, or None if it does not exist
+  """
+  if not nn_address or not modeler_type or not metric:
+    return None
+
+  nn_address = nn_address.strip()
+  if not nn_address.startswith("http"):
+    nn_address = ("https://" if encrypted else "http://") + nn_address
+  if not nn_address.endswith("/"):
+    nn_address = nn_address + "/"
+
+  nn_address = nn_address + "jmx"
+  Logger.info("Retrieve modeler: %s, metric: %s from JMX endpoint %s" % (modeler_type, metric, nn_address))
+
+  data = urllib2.urlopen(nn_address).read()
+  data_dict = json.loads(data)
+  my_data = None
+  if data_dict:
+    for el in data_dict['beans']:
+      if el is not None and el['modelerType'] is not None and el['modelerType'].startswith(modeler_type):
+        if metric in el:
+          my_data = el[metric]
+          if my_data:
+            my_data = json.loads(str(my_data))
+            break
+  return my_data
+
 def get_port(address):
   """
   Extracts port from the address like 0.0.0.0:1019
@@ -223,7 +258,7 @@ def get_port(address):
   if address is None:
     return None
   m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address)
-  if m is not None:
+  if m is not None and len(m.groups()) >= 2:
     return int(m.group(2))
   else:
     return None