瀏覽代碼

AMBARI-2451. Enhance host checks in the api for more checks on the host - users/ntp/others. (smohanty)

git-svn-id: https://svn.apache.org/repos/asf/incubator/ambari/trunk@1498796 13f79535-47bb-0310-9956-ffa450edef68
Sumit Mohanty 12 年之前
父節點
當前提交
f6ee76b0a8

+ 11 - 4
ambari-agent/src/main/python/ambari_agent/Controller.py

@@ -59,6 +59,7 @@ class Controller(threading.Thread):
     self.repeatRegistration = False
     self.cachedconnect = None
     self.range = range
+    self.hasMappedComponents = True
 
   def __del__(self):
     logger.info("Server connection disconnected.")
@@ -86,6 +87,8 @@ class Controller(threading.Thread):
           logger.info("Got status commands on registration " + pprint.pformat(ret['statusCommands']) )
           self.addToQueue(ret['statusCommands'])
           pass
+        else:
+          self.hasMappedComponents = False
         pass
       except ssl.SSLError:
         self.repeatRegistration=False
@@ -120,7 +123,7 @@ class Controller(threading.Thread):
   # For testing purposes
   DEBUG_HEARTBEAT_RETRIES = 0
   DEBUG_SUCCESSFULL_HEARTBEATS = 0
-  DEBUG_STOP_HEARTBITTING = False
+  DEBUG_STOP_HEARTBEATING = False
 
   def heartbeatWithServer(self):
     self.DEBUG_HEARTBEAT_RETRIES = 0
@@ -133,10 +136,11 @@ class Controller(threading.Thread):
 
     #TODO make sure the response id is monotonically increasing
     id = 0
-    while not self.DEBUG_STOP_HEARTBITTING:
+    while not self.DEBUG_STOP_HEARTBEATING:
       try:
         if not retry:
-          data = json.dumps(self.heartbeat.build(self.responseId, int(hb_interval)))
+          data = json.dumps(
+              self.heartbeat.build(self.responseId, int(hb_interval), self.hasMappedComponents))
           pass
         else:
           self.DEBUG_HEARTBEAT_RETRIES += 1
@@ -147,6 +151,9 @@ class Controller(threading.Thread):
         
         serverId=int(response['responseId'])
 
+        if 'hasMappedComponents' in response.keys():
+          self.hasMappedComponents = response['hasMappedComponents'] != False
+
         if 'registrationCommand' in response.keys():
           # check if the registration command is None. If none skip
           if response['registrationCommand'] is not None:
@@ -209,7 +216,7 @@ class Controller(threading.Thread):
     self.actionQueue = ActionQueue(self.config)
     self.actionQueue.start()
     self.register = Register(self.config)
-    self.heartbeat = Heartbeat(self.actionQueue)
+    self.heartbeat = Heartbeat(self.actionQueue, self.config)
 
     opener = urllib2.build_opener()
     urllib2.install_opener(opener)

+ 27 - 17
ambari-agent/src/main/python/ambari_agent/Hardware.py

@@ -37,7 +37,28 @@ class Hardware:
     otherInfo = self.facterInfo()
     self.hardware.update(otherInfo)
     pass
-  
+
+  @staticmethod
+  def extractMountInfo(outputLine):
+    if outputLine == None or len(outputLine) == 0:
+      return None
+
+      """ this ignores any spaces in the filesystemname and mounts """
+    split = outputLine.split()
+    if (len(split)) == 7:
+      device, type, size, used, available, percent, mountpoint = split
+      mountinfo = {
+        'size' : size,
+        'used' : used,
+        'available' : available,
+        'percent' : percent,
+        'mountpoint' : mountpoint,
+        'type': type,
+        'device' : device }
+      return mountinfo
+    else:
+      return None
+
   def osdisks(self):
     """ Run df to find out the disks on the host. Only works on linux 
     platforms. Note that this parser ignores any filesystems with spaces 
@@ -47,24 +68,13 @@ class Hardware:
     dfdata = df.communicate()[0]
     lines = dfdata.splitlines()
     for l in lines:
-      split = l.split()
-      """ this ignores any spaces in the filesystemname and mounts """
-      if (len(split)) == 7:
-        device, type, size, used, available, percent, mountpoint = split
-        mountinfo = { 
-                     'size' : size,
-                     'used' : used,
-                     'available' : available,
-                     'percent' : percent,
-                     'mountpoint' : mountpoint,
-                     'type': type,
-                     'device' : device }
-        if os.access(mountpoint, os.W_OK):
-          mounts.append(mountinfo)
-        pass
+      mountinfo = self.extractMountInfo(l)
+      if mountinfo != None and os.access(mountinfo['mountpoint'], os.W_OK):
+        mounts.append(mountinfo)
       pass
+    pass
     return mounts
-    
+
   def facterBin(self, facterHome):
     facterBin = facterHome + "/bin/facter"
     if (os.path.exists(facterBin)):

+ 15 - 5
ambari-agent/src/main/python/ambari_agent/Heartbeat.py

@@ -34,11 +34,12 @@ logger = logging.getLogger()
 firstContact = True
 class Heartbeat:
 
-  def __init__(self, actionQueue):
+  def __init__(self, actionQueue, config=None):
     self.actionQueue = actionQueue
+    self.config = config
     self.reports = []
 
-  def build(self, id='-1', state_interval=-1):
+  def build(self, id='-1', state_interval=-1, componentsMapped=False):
     global clusterId, clusterDefinitionRevision, firstContact
     timestamp = int(time.time()*1000)
     queueResult = self.actionQueue.result()
@@ -53,19 +54,28 @@ class Heartbeat:
                   'nodeStatus'        : nodeStatus
                 }
 
+    commandsInProgress = False
+    if self.actionQueue.commandQueue.empty() == False:
+      commandsInProgress = True
     if len(queueResult) != 0:
       heartbeat['reports'] = queueResult['reports']
       heartbeat['componentStatus'] = queueResult['componentStatus']
+      if len(heartbeat['reports']) > 0:
+        # There may be IN_PROGRESS tasks
+        commandsInProgress = True
       pass
     logger.info("Sending heartbeat with response id: " + str(id) + " and "
-                "timestamp: " + str(timestamp))
+                "timestamp: " + str(timestamp) +
+                ". Command(s) in progress: " + repr(commandsInProgress) +
+                ". Components mapped: " + repr(componentsMapped))
     logger.debug("Heartbeat : " + pformat(heartbeat))
 
     if (int(id) >= 0) and state_interval > 0 and (int(id) % state_interval) == 0:
-      hostInfo = HostInfo()
+      hostInfo = HostInfo(self.config)
       nodeInfo = { }
       # for now, just do the same work as registration
-      hostInfo.register(nodeInfo)
+      # this must be the last step before returning heartbeat
+      hostInfo.register(nodeInfo, componentsMapped, commandsInProgress)
       heartbeat['agentEnv'] = nodeInfo
       logger.debug("agentEnv : " + str(nodeInfo))
 

+ 109 - 0
ambari-agent/src/main/python/ambari_agent/HostCheckReportFileHandler.py

@@ -0,0 +1,109 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import datetime
+import os.path
+import logging
+import traceback
+import ConfigParser;
+
+logger = logging.getLogger()
+
+class HostCheckReportFileHandler:
+
+  HOST_CHECK_FILE = "hostcheck.result"
+
+  def __init__(self, config):
+    if config != None:
+      hostCheckFileDir = config.get('agent', 'prefix')
+      self.hostCheckFilePath = os.path.join(hostCheckFileDir, self.HOST_CHECK_FILE)
+
+  def writeHostCheckFile(self, hostInfo):
+    if self.hostCheckFilePath == None:
+      return
+
+    try:
+      logger.info("Host check report at " + self.hostCheckFilePath)
+      config = ConfigParser.RawConfigParser()
+      config.add_section('metadata')
+      config.set('metadata', 'created', str(datetime.datetime.now()))
+
+      if 'existingUsers' in hostInfo.keys():
+        items = []
+        for itemDetail in hostInfo['existingUsers']:
+          items.append(itemDetail['name'])
+        config.add_section('users')
+        config.set('users', 'usr_list', ','.join(items))
+
+      if 'alternatives' in hostInfo.keys():
+        items = []
+        items2 = []
+        for itemDetail in hostInfo['alternatives']:
+          items.append(itemDetail['name'])
+          items2.append(itemDetail['target'])
+        config.add_section('alternatives')
+        config.set('alternatives', 'symlink_list', ','.join(items))
+        config.set('alternatives', 'target_list', ','.join(items2))
+
+      if 'stackFoldersAndFiles' in hostInfo.keys():
+        items = []
+        for itemDetail in hostInfo['stackFoldersAndFiles']:
+          items.append(itemDetail['name'])
+        config.add_section('directories')
+        config.set('directories', 'dir_list', ','.join(items))
+
+      if 'hostHealth' in hostInfo.keys():
+        if 'activeJavaProcs' in hostInfo['hostHealth'].keys():
+          items = []
+          for itemDetail in hostInfo['hostHealth']['activeJavaProcs']:
+            items.append(itemDetail['pid'])
+          config.add_section('processes')
+          config.set('processes', 'proc_list', ','.join(map(str, items)))
+
+      if 'installedPackages' in hostInfo.keys():
+        items = []
+        for itemDetail in hostInfo['installedPackages']:
+          items.append(itemDetail['name'])
+        config.add_section('packages')
+        config.set('packages', 'pkg_list', ','.join(map(str, items)))
+
+      if 'existingRepos' in hostInfo.keys():
+        config.add_section('repositories')
+        config.set('repositories', 'repo_list', ','.join(hostInfo['existingRepos']))
+
+      self.removeFile()
+      self.touchFile()
+      with open(self.hostCheckFilePath, 'wb') as configfile:
+        config.write(configfile)
+    except Exception, err:
+      logger.error("Can't write host check file at %s :%s " % (self.hostCheckFilePath, err.message))
+      traceback.print_exc()
+
+  def removeFile(self):
+    if os.path.isfile(self.hostCheckFilePath):
+      logger.info("Removing old host check file at %s" % self.hostCheckFilePath)
+      os.remove(self.hostCheckFilePath)
+
+  def touchFile(self):
+    if not os.path.isfile(self.hostCheckFilePath):
+      logger.info("Creating host check file at %s" % self.hostCheckFilePath)
+      open(self.hostCheckFilePath, 'w').close()
+
+

+ 229 - 56
ambari-agent/src/main/python/ambari_agent/HostInfo.py

@@ -20,11 +20,77 @@ limitations under the License.
 
 import os
 import glob
+import logging
 import pwd
+import re
+import time
 import subprocess
+import threading
 import AmbariConfig
+from PackagesAnalyzer import PackagesAnalyzer
+from HostCheckReportFileHandler import HostCheckReportFileHandler
+from Hardware import Hardware
+
+logger = logging.getLogger()
+
 
 class HostInfo:
+  # List of project names to be used to find alternatives folders etc.
+  DEFAULT_PROJECT_NAMES = [
+    "hadoop*", "hadoop", "hbase", "hcatalog", "hive", "ganglia", "nagios",
+    "oozie", "sqoop", "hue", "zookeeper", "mapred", "hdfs", "flume",
+    "ambari_qa", "hadoop_deploy", "rrdcached", "hcat"
+  ]
+
+  # List of live services checked for on the host
+  DEFAULT_LIVE_SERVICES = [
+    "ntpd"
+  ]
+
+  # Set of default users (need to be replaced with the configured user names)
+  DEFAULT_USERS = [
+    "nagios", "hive", "ambari-qa", "oozie", "hbase", "hcat", "mapred",
+    "hdfs", "rrdcached", "zookeeper", "mysql", "flume", "sqoop", "sqoop2",
+    "hue", "yarn"
+  ]
+
+  # Filters used to identify processed
+  PROC_FILTER = [
+    "hadoop", "zookeeper"
+  ]
+
+  # Default set of directories that are checked for existence of files and folders
+  DEFAULT_DIRS = [
+    "/etc", "/var/run", "/var/log", "/usr/lib", "/var/lib", "/var/tmp", "/tmp", "/var"
+  ]
+
+  # Packages that are used to find repos (then repos are used to find other packages)
+  PACKAGES = [
+    "hadoop", "zookeeper", "webhcat", "*-manager-server-db", "*-manager-daemons"
+  ]
+
+  # Additional packages to look for (search packages that start with these)
+  ADDITIONAL_PACKAGES = [
+    "rrdtool", "rrdtool-python", "nagios", "ganglia", "gmond", "gweb"
+  ]
+
+  # ignores packages from repos whose names start with these strings
+  IGNORE_PACKAGES_FROM_REPOS = [
+    "ambari", "installed"
+  ]
+
+  IGNORE_REPOS = [
+    "ambari", "HDP-UTILS"
+  ]
+
+  # default timeout for async invoked processes
+  TIMEOUT_SECONDS = 60
+  RESULT_UNAVAILABLE = "unable_to_determine"
+  event = threading.Event()
+
+  def __init__(self, config=None):
+    self.packages = PackagesAnalyzer()
+    self.reportFileHandler = HostCheckReportFileHandler(config)
 
   def dirType(self, path):
     if not os.path.exists(path):
@@ -43,7 +109,7 @@ class HostInfo:
     try:
       for rpmName in config.get('heartbeat', 'rpms').split(','):
         rpmName = rpmName.strip()
-        rpm = { }
+        rpm = {}
         rpm['name'] = rpmName
 
         try:
@@ -72,82 +138,189 @@ class HostInfo:
       return 0
     logs = glob.glob('/var/log/hadoop/*/*.log')
     return len(logs)
-  
-  def etcAlternativesConf(self, etcList):
+
+  def etcAlternativesConf(self, projects, etcResults):
     if not os.path.exists('/etc/alternatives'):
       return []
-    confs = glob.glob('/etc/alternatives/*conf')
-
-    for conf in confs:
-      confinfo = { }
-      realconf = conf
-      if os.path.islink(conf):
-        realconf = os.path.realpath(conf)
-      confinfo['name'] = conf
-      confinfo['target'] = realconf
-      etcList.append(confinfo)
-
-  def repos(self):
-    # future
-    return "could_not_determine"
-    
-
-  def register(self, dict):
-    dict['varLogHadoopLogCount'] = self.hadoopVarLogCount()
-    dict['varRunHadoopPidCount'] = self.hadoopVarRunCount()
-    
-    etcs = []
-    self.etcAlternativesConf(etcs)
-    dict['etcAlternativesConf'] = etcs
-
-    dirs = []
-    config = AmbariConfig.config
+    projectRegex = "'" + '|'.join(projects) + "'"
+    files = [f for f in os.listdir('/etc/alternatives') if re.match(projectRegex, f)]
+    for conf in files:
+      result = {}
+      filePath = os.path.join('/etc/alternatives', conf)
+      if os.path.islink(filePath):
+        realConf = os.path.realpath(filePath)
+        result['name'] = conf
+        result['target'] = realConf
+        etcResults.append(result)
+
+  def checkLiveServices(self, services, result):
+    for service in services:
+      svcCheckResult = {}
+      svcCheckResult['name'] = service
+      svcCheckResult['status'] = "UNKNOWN"
+      svcCheckResult['desc'] = ""
+      try:
+        osStat = subprocess.Popen(["service", service, "status"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        out, err = osStat.communicate()
+        if 0 != osStat.returncode:
+          svcCheckResult['status'] = "Unhealthy"
+          svcCheckResult['desc'] = out
+          if len(out) == 0:
+            svcCheckResult['desc'] = err
+        else:
+          svcCheckResult['status'] = "Healthy"
+      except Exception, e:
+        svcCheckResult['status'] = "Unhealthy"
+        svcCheckResult['desc'] = repr(e)
+      result.append(svcCheckResult)
+
+  def checkUsers(self, users, results):
+    f = open('/etc/passwd', 'r')
+    for userLine in f:
+      fields = userLine.split(":")
+      if fields[0] in users:
+        result = {}
+        homeDir = fields[5]
+        result['name'] = fields[0]
+        result['homeDir'] = fields[5]
+        result['status'] = "Available";
+        if not os.path.exists(homeDir):
+          result['status'] = "Invalid home directory";
+        results.append(result)
+
+  def osdiskAvailableSpace(self, path):
+    diskInfo = {}
     try:
-      for dirName in config.get('heartbeat', 'dirs').split(','):
-        obj = { }
-        obj['type'] = self.dirType(dirName.strip())
-        obj['name'] = dirName.strip()
-        dirs.append(obj)
+      df = subprocess.Popen(["df", "-kPT", path], stdout=subprocess.PIPE)
+      dfdata = df.communicate()[0]
+      return Hardware.extractMountInfo(dfdata.splitlines()[-1])
     except:
       pass
+    return diskInfo
 
-    dict['paths'] = dirs
-
-    java = []
-    self.javaProcs(java)
-    dict['javaProcs'] = java
-
-    rpms = []
-    self.rpmInfo(rpms)
-    dict['rpms'] = rpms
+  def checkFolders(self, basePaths, projectNames, dirs):
+    try:
+      for dirName in basePaths:
+        for project in projectNames:
+          path = os.path.join(dirName.strip(), project.strip())
+          if os.path.exists(path):
+            obj = {}
+            obj['type'] = self.dirType(path)
+            obj['name'] = path
+            dirs.append(obj)
+    except:
+      pass
 
-    dict['repoInfo'] = self.repos()
-    
   def javaProcs(self, list):
     try:
       pids = [pid for pid in os.listdir('/proc') if pid.isdigit()]
       for pid in pids:
         cmd = open(os.path.join('/proc', pid, 'cmdline'), 'rb').read()
         cmd = cmd.replace('\0', ' ')
-        if 'java' in cmd:
-          dict = { }
-          dict['pid'] = int(pid)
-          dict['hadoop'] = True if 'hadoop' in cmd else False
-          dict['command'] = cmd.strip()
-          for line in open(os.path.join('/proc', pid, 'status')):
-            if line.startswith('Uid:'):
-              uid = int(line.split()[1])
-              dict['user'] = pwd.getpwuid(uid).pw_name
-          list.append(dict)
+        if not 'AmbariServer' in cmd:
+          if 'java' in cmd:
+            dict = {}
+            dict['pid'] = int(pid)
+            dict['hadoop'] = False
+            for filter in self.PROC_FILTER:
+              if filter in cmd:
+                dict['hadoop'] = True
+            dict['command'] = cmd.strip()
+            for line in open(os.path.join('/proc', pid, 'status')):
+              if line.startswith('Uid:'):
+                uid = int(line.split()[1])
+                dict['user'] = pwd.getpwuid(uid).pw_name
+            list.append(dict)
     except:
       pass
     pass
 
+  def getReposToRemove(self, repos, ignoreList):
+    reposToRemove = []
+    for repo in repos:
+      addToRemoveList = True
+      for ignoreRepo in ignoreList:
+        if self.packages.nameMatch(ignoreRepo, repo):
+          addToRemoveList = False
+          continue
+      if addToRemoveList:
+        reposToRemove.append(repo)
+    return reposToRemove
+
+  """ Return various details about the host
+  componentsMapped: indicates if any components are mapped to this host
+  commandsInProgress: indicates if any commands are in progress
+  """
+  def register(self, dict, componentsMapped=True, commandsInProgress=True):
+    dict['hostHealth'] = {}
+
+    java = []
+    self.javaProcs(java)
+    dict['hostHealth']['activeJavaProcs'] = java
+
+    dict['hostHealth']['diskStatus'] = [self.osdiskAvailableSpace("/")]
+
+    rpms = []
+    self.rpmInfo(rpms)
+    dict['rpms'] = rpms
+
+    liveSvcs = []
+    self.checkLiveServices(self.DEFAULT_LIVE_SERVICES, liveSvcs)
+    dict['hostHealth']['liveServices'] = liveSvcs
+
+    # If commands are in progress or components are already mapped to this host
+    # Then do not perform certain expensive host checks
+    if componentsMapped or commandsInProgress:
+      dict['existingRepos'] = [self.RESULT_UNAVAILABLE]
+      dict['installedPackages'] = []
+      dict['alternatives'] = []
+      dict['stackFoldersAndFiles'] = []
+      dict['existingUsers'] = []
+
+    else:
+      etcs = []
+      self.etcAlternativesConf(self.DEFAULT_PROJECT_NAMES, etcs)
+      dict['alternatives'] = etcs
+
+      dirs = []
+      self.checkFolders(self.DEFAULT_DIRS, self.DEFAULT_PROJECT_NAMES, dirs)
+      dict['stackFoldersAndFiles'] = dirs
+
+      existingUsers = []
+      self.checkUsers(self.DEFAULT_USERS, existingUsers)
+      dict['existingUsers'] = existingUsers
+
+      installedPackages = []
+      availablePackages = []
+      self.packages.allInstalledPackages(installedPackages)
+      self.packages.allAvailablePackages(availablePackages)
+
+      repos = []
+      self.packages.getInstalledRepos(self.PACKAGES, installedPackages + availablePackages,
+                                      self.IGNORE_PACKAGES_FROM_REPOS, repos)
+      repos = self.getReposToRemove(repos, self.IGNORE_REPOS)
+      dict['existingRepos'] = repos
+
+      packagesInstalled = self.packages.getInstalledPkgsByRepo(repos, installedPackages)
+      additionalPkgsInstalled = self.packages.getInstalledPkgsByNames(
+        self.ADDITIONAL_PACKAGES, installedPackages)
+      allPackages = list(set(packagesInstalled + additionalPkgsInstalled))
+      dict['installedPackages'] = self.packages.getPackageDetails(installedPackages, allPackages)
+      self.reportFileHandler.writeHostCheckFile(dict)
+      pass
+
+    # The time stamp must be recorded at the end
+    dict['hostHealth']['agentTimeStampAtReporting'] = int(time.time() * 1000)
+
+    pass
+
+
 def main(argv=None):
   h = HostInfo()
-  struct = { }
+  struct = {}
   h.register(struct)
   print struct
 
+
 if __name__ == '__main__':
   main()

+ 208 - 0
ambari-agent/src/main/python/ambari_agent/PackagesAnalyzer.py

@@ -0,0 +1,208 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import os
+import logging
+import pwd
+import shell
+import subprocess
+from threading import Thread
+import threading
+
+logger = logging.getLogger()
+
+class PackagesAnalyzer:
+
+  # default timeout for async invoked processes
+  TIMEOUT_SECONDS = 60
+  event = threading.Event()
+
+  def launch_subprocess(self, command):
+    return subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+  def watchdog_func(self, command):
+    self.event.wait(self.TIMEOUT_SECONDS)
+    if command.returncode is None:
+      logger.error("Task timed out and will be killed")
+      shell.killprocessgrp(command.pid)
+    pass
+
+  def subprocessWithTimeout(self, command):
+    osStat = self.launch_subprocess(command)
+    logger.debug("Launching watchdog thread")
+    self.event.clear()
+    thread = Thread(target=self.watchdog_func, args=(osStat, ))
+    thread.start()
+
+    out, err = osStat.communicate()
+    result = {}
+    result['out'] = out
+    result['err'] = err
+    result['retCode'] = osStat.returncode
+
+    self.event.set()
+    thread.join()
+    return result
+
+  # Get all installed package whose name starts with the
+  # strings contained in pkgName
+  def installedPkgsByName(self, allInstalledPackages,
+                          pkgName, installedPkgs):
+    for item in allInstalledPackages:
+      if item[0].find(pkgName) == 0:
+        installedPkgs.append(item[0])
+
+  def hasZypper(self):
+    try:
+      result = self.subprocessWithTimeout(["which", "zypper"])
+      if 0 == result['retCode']:
+        return True
+      else:
+        return False
+    except:
+      pass
+
+  # All installed packages in systems supporting yum
+  def allInstalledPackages(self, allInstalledPackages):
+    if self.hasZypper():
+      return self.lookUpZypperPackages(
+        ["zypper", "search", "--installed-only", "--details"],
+        allInstalledPackages)
+    else:
+      return self.lookUpYumPackages(
+        ["yum", "list", "installed"],
+        'Installed Packages',
+        allInstalledPackages)
+
+  # All available packages in systems supporting yum
+  def allAvailablePackages(self, allAvailablePackages):
+    if self.hasZypper():
+      return self.lookUpZypperPackages(
+        ["zypper", "search", "--uninstalled-only", "--details"],
+        allAvailablePackages)
+    else:
+      return self.lookUpYumPackages(
+        ["yum", "list", "available"],
+        'Available Packages',
+        allAvailablePackages)
+
+  def lookUpYumPackages(self, command, skipTill, allPackages):
+    try:
+      result = self.subprocessWithTimeout(command)
+      if 0 == result['retCode']:
+        lines = result['out'].split('\n')
+        lines = [line.strip() for line in lines]
+        items = []
+        skipIndex = 3
+        for index in range(len(lines)):
+          if skipTill in lines[index]:
+            skipIndex = index + 1
+            break
+
+        for line in lines[skipIndex:]:
+          items = items + line.strip(' \t\n\r').split()
+
+        for i in range(0, len(items), 3):
+          if items[i + 2].find('@') == 0:
+            items[i + 2] = items[i + 2][1:]
+          allPackages.append(items[i:i + 3])
+    except:
+      pass
+
+  def lookUpZypperPackages(self, command, allPackages):
+    try:
+      result = self.subprocessWithTimeout(command)
+      if 0 == result['retCode']:
+        lines = result['out'].split('\n')
+        lines = [line.strip() for line in lines]
+        items = []
+        for index in range(len(lines)):
+          if "--+--" in lines[index]:
+            skipIndex = index + 1
+            break
+
+        for line in lines[skipIndex:]:
+          items = line.strip(' \t\n\r').split('|')
+          allPackages.append([items[1].strip(), items[3].strip(), items[5].strip()])
+    except:
+      pass
+
+  def nameMatch(self, lookupName, actualName):
+    tokens = actualName.strip().split()
+    for token in tokens:
+      if token.lower().find(lookupName.lower()) == 0:
+        return True
+    return False
+
+  # Gets all installed repos by name based on repos that provide any package
+  # contained in hintPackages
+  # Repos starting with value in ignoreRepos will not be returned
+  def getInstalledRepos(self, hintPackages, allPackages, ignoreRepos, repoList):
+    allRepos = []
+    for hintPackage in hintPackages:
+      for item in allPackages:
+        if 0 == item[0].find(hintPackage):
+          if not item[2] in allRepos:
+            allRepos.append(item[2])
+        elif hintPackage[0] == '*':
+          if item[0].find(hintPackage[1:]) > 0:
+            if not item[2] in allRepos:
+              allRepos.append(item[2])
+
+    for repo in allRepos:
+      ignore = False
+      for ignoredRepo in ignoreRepos:
+        if self.nameMatch(ignoredRepo, repo):
+          ignore = True
+      if not ignore:
+        repoList.append(repo)
+
+  # Get all the installed packages from the repos listed in repos
+  def getInstalledPkgsByRepo(self, repos, installedPackages):
+    packagesFromRepo = []
+    for repo in repos:
+      subResult = []
+      for item in installedPackages:
+        if repo == item[2]:
+          subResult.append(item[0])
+      packagesFromRepo = list(set(packagesFromRepo + subResult))
+    return packagesFromRepo
+
+  # Gets all installed packages that start with names in pkgNames
+  def getInstalledPkgsByNames(self, pkgNames, installedPackages):
+    packages = []
+    for pkgName in pkgNames:
+      subResult = []
+      self.installedPkgsByName(installedPackages, pkgName, subResult)
+      packages = list(set(packages + subResult))
+    return packages
+
+  # Gets the name, version, and repoName for the packages
+  def getPackageDetails(self, installedPackages, foundPackages):
+    packageDetails = []
+    for package in foundPackages:
+      pkgDetail = {}
+      for installedPackage in installedPackages:
+        if package == installedPackage[0]:
+          pkgDetail['name'] = installedPackage[0]
+          pkgDetail['version'] = installedPackage[1]
+          pkgDetail['repoName'] = installedPackage[2]
+      packageDetails.append(pkgDetail)
+    return packageDetails

+ 9 - 9
ambari-agent/src/main/python/ambari_agent/Register.py

@@ -38,20 +38,20 @@ class Register:
     global clusterId, clusterDefinitionRevision, firstContact
     timestamp = int(time.time()*1000)
    
-    hostInfo = HostInfo() 
+    hostInfo = HostInfo(self.config)
     agentEnv = { }
-    hostInfo.register(agentEnv)
+    hostInfo.register(agentEnv, False, False)
 
     version = self.read_agent_version()
     
     register = { 'responseId'        : int(id),
-                  'timestamp'         : timestamp,
-                  'hostname'          : hostname.hostname(),
-                  'publicHostname'    : hostname.public_hostname(),
-                  'hardwareProfile'   : self.hardware.get(),
-                  'agentEnv'          : agentEnv,
-                  'agentVersion'      : version
-                }
+                 'timestamp'         : timestamp,
+                 'hostname'          : hostname.hostname(),
+                 'publicHostname'    : hostname.public_hostname(),
+                 'hardwareProfile'   : self.hardware.get(),
+                 'agentEnv'          : agentEnv,
+                 'agentVersion'      : version
+               }
     return register
 
   def read_agent_version(self):

+ 33 - 8
ambari-agent/src/test/python/TestController.py

@@ -262,7 +262,7 @@ class TestController(unittest.TestCase):
     loadsMock.return_value = response
 
     def one_heartbeat(*args, **kwargs):
-      self.controller.DEBUG_STOP_HEARTBITTING = True
+      self.controller.DEBUG_STOP_HEARTBEATING = True
       return "data"
 
     sendRequest.side_effect = one_heartbeat
@@ -282,12 +282,12 @@ class TestController(unittest.TestCase):
         response["responseId"] = "3"
         raise Exception()
       if len(calls) > 0:
-        self.controller.DEBUG_STOP_HEARTBITTING = True
+        self.controller.DEBUG_STOP_HEARTBEATING = True
       return "data"
 
     # exception, retry, successful and stop
     sendRequest.side_effect = retry
-    self.controller.DEBUG_STOP_HEARTBITTING = False
+    self.controller.DEBUG_STOP_HEARTBEATING = False
     self.controller.heartbeatWithServer()
 
     self.assertEqual(1, self.controller.DEBUG_SUCCESSFULL_HEARTBEATS)
@@ -295,18 +295,43 @@ class TestController(unittest.TestCase):
     # retry registration
     response["registrationCommand"] = "true"
     sendRequest.side_effect = one_heartbeat
-    self.controller.DEBUG_STOP_HEARTBITTING = False
+    self.controller.DEBUG_STOP_HEARTBEATING = False
     self.controller.heartbeatWithServer()
 
     self.assertTrue(self.controller.repeatRegistration)
 
+    # components are not mapped
+    response["registrationCommand"] = "false"
+    response["hasMappedComponents"] = False
+    sendRequest.side_effect = one_heartbeat
+    self.controller.DEBUG_STOP_HEARTBEATING = False
+    self.controller.heartbeatWithServer()
+
+    self.assertFalse(self.controller.hasMappedComponents)
+
+    # components are mapped
+    response["hasMappedComponents"] = True
+    sendRequest.side_effect = one_heartbeat
+    self.controller.DEBUG_STOP_HEARTBEATING = False
+    self.controller.heartbeatWithServer()
+
+    self.assertTrue(self.controller.hasMappedComponents)
+
+    # components are mapped
+    del response["hasMappedComponents"]
+    sendRequest.side_effect = one_heartbeat
+    self.controller.DEBUG_STOP_HEARTBEATING = False
+    self.controller.heartbeatWithServer()
+
+    self.assertTrue(self.controller.hasMappedComponents)
+
     # wrong responseId => restart
     response = {"responseId":"2", "restartAgent":"false"}
     loadsMock.return_value = response
 
     restartAgent = MagicMock(name="restartAgent")
     self.controller.restartAgent = restartAgent
-    self.controller.DEBUG_STOP_HEARTBITTING = False
+    self.controller.DEBUG_STOP_HEARTBEATING = False
     self.controller.heartbeatWithServer()
 
     restartAgent.assert_called_once_with()
@@ -317,7 +342,7 @@ class TestController(unittest.TestCase):
     self.controller.addToQueue = addToQueue
     response["executionCommands"] = "executionCommands"
     response["statusCommands"] = "statusCommands"
-    self.controller.DEBUG_STOP_HEARTBITTING = False
+    self.controller.DEBUG_STOP_HEARTBEATING = False
     self.controller.heartbeatWithServer()
 
     addToQueue.assert_has_calls([call("executionCommands"),
@@ -325,7 +350,7 @@ class TestController(unittest.TestCase):
 
     # restartAgent command
     self.controller.responseId = 1
-    self.controller.DEBUG_STOP_HEARTBITTING = False
+    self.controller.DEBUG_STOP_HEARTBEATING = False
     response["restartAgent"] = "true"
     restartAgent = MagicMock(name="restartAgent")
     self.controller.restartAgent = restartAgent
@@ -335,7 +360,7 @@ class TestController(unittest.TestCase):
 
     # actionQueue not idle
     self.controller.responseId = 1
-    self.controller.DEBUG_STOP_HEARTBITTING = False
+    self.controller.DEBUG_STOP_HEARTBEATING = False
     actionQueue.isIdle.return_value = False
     response["restartAgent"] = "false"
     self.controller.heartbeatWithServer()

+ 57 - 0
ambari-agent/src/test/python/TestHeartbeat.py

@@ -19,6 +19,7 @@ limitations under the License.
 '''
 
 from unittest import TestCase
+import unittest
 from ambari_agent.Heartbeat import Heartbeat
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.LiveStatus import LiveStatus
@@ -28,6 +29,7 @@ import os
 import time
 from mock.mock import patch, MagicMock, call
 from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
+from ambari_agent.HostInfo import HostInfo
 import StringIO
 import sys
 
@@ -111,6 +113,58 @@ class TestHeartbeat(TestCase):
     NUMBER_OF_COMPONENTS = 1
     self.assertEquals(max_number_of_status_entries == NUMBER_OF_COMPONENTS, True)
 
+  @patch.object(HostInfo, 'register')
+  def test_heartbeat_no_host_check_cmd_in_progress(self, register_mock):
+    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
+    actionQueue.commandInProgress= {
+      'role' : "role",
+      'actionId' : "actionId",
+      'taskId' : "taskId",
+      'stdout' : "stdout",
+      'clusterName' : "clusterName",
+      'stderr' : 'none',
+      'exitCode' : 777,
+      'serviceName' : "serviceName",
+      'status' : 'IN_PROGRESS',
+      'configurations':{'global' : {}},
+      'roleCommand' : 'START'
+    }
+    heartbeat = Heartbeat(actionQueue)
+    heartbeat.build(12, 6)
+    self.assertTrue(register_mock.called)
+    args, kwargs = register_mock.call_args_list[0]
+    self.assertTrue(args[2])
+    self.assertFalse(args[1])
+
+  @patch.object(HostInfo, 'register')
+  def test_heartbeat_no_host_check_cmd_in_queue(self, register_mock):
+    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
+    statusCommand = {
+      "serviceName" : 'HDFS',
+      "commandType" : "STATUS_COMMAND",
+      "clusterName" : "",
+      "componentName" : "DATANODE",
+      'configurations':{'global' : {}}
+    }
+    actionQueue.commandQueue.put(statusCommand)
+
+    heartbeat = Heartbeat(actionQueue)
+    heartbeat.build(12, 6)
+    self.assertTrue(register_mock.called)
+    args, kwargs = register_mock.call_args_list[0]
+    self.assertTrue(args[2])
+    self.assertFalse(args[1])
+
+  @patch.object(HostInfo, 'register')
+  def test_heartbeat_host_check_no_cmd(self, register_mock):
+    actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
+    heartbeat = Heartbeat(actionQueue)
+    heartbeat.build(12, 6)
+    self.assertTrue(register_mock.called)
+    args, kwargs = register_mock.call_args_list[0]
+    self.assertFalse(args[1])
+    self.assertFalse(args[2])
+
   def test_heartbeat_with_task_in_progress(self):
     actionQueue = ActionQueue(AmbariConfig.AmbariConfig().getConfig())
     actionQueue.commandInProgress= {
@@ -141,3 +195,6 @@ class TestHeartbeat(TestCase):
     self.assertEquals(result['reports'][0]['status'], "IN_PROGRESS")
     self.assertEquals(result['reports'][0]['roleCommand'], "START")
     pass
+
+if __name__ == "__main__":
+  unittest.main(verbosity=2)

+ 158 - 0
ambari-agent/src/test/python/TestHostCheckReportFileHandler.py

@@ -0,0 +1,158 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+import unittest
+import os
+import tempfile
+from ambari_agent.HostCheckReportFileHandler import HostCheckReportFileHandler
+import logging
+import ConfigParser
+
+class TestHostCheckReportFileHandler(TestCase):
+
+  logger = logging.getLogger()
+
+  def test_write_host_check_report_really_empty(self):
+    tmpfile = tempfile.mktemp()
+
+    config = ConfigParser.RawConfigParser()
+    config.add_section('agent')
+    config.set('agent', 'prefix', os.path.dirname(tmpfile))
+
+    handler = HostCheckReportFileHandler(config)
+    dict = {}
+    handler.writeHostCheckFile(dict)
+
+    configValidator = ConfigParser.RawConfigParser()
+    configPath = os.path.join(os.path.dirname(tmpfile), HostCheckReportFileHandler.HOST_CHECK_FILE)
+    configValidator.read(configPath)
+    if configValidator.has_section('users'):
+      users = configValidator.get('users', 'usr_list')
+      self.assertEquals(users, '')
+
+  def test_write_host_check_report_empty(self):
+    tmpfile = tempfile.mktemp()
+
+    config = ConfigParser.RawConfigParser()
+    config.add_section('agent')
+    config.set('agent', 'prefix', os.path.dirname(tmpfile))
+
+    handler = HostCheckReportFileHandler(config)
+    dict = {}
+    dict['hostHealth'] = {}
+    dict['existingUsers'] = []
+    dict['alternatives'] = []
+    dict['stackFoldersAndFiles'] = []
+    dict['hostHealth']['activeJavaProcs'] = []
+    dict['installedPackages'] = []
+    dict['existingRepos'] = []
+
+    handler.writeHostCheckFile(dict)
+
+    configValidator = ConfigParser.RawConfigParser()
+    configPath = os.path.join(os.path.dirname(tmpfile), HostCheckReportFileHandler.HOST_CHECK_FILE)
+    configValidator.read(configPath)
+    users = configValidator.get('users', 'usr_list')
+    self.assertEquals(users, '')
+    names = configValidator.get('alternatives', 'symlink_list')
+    targets = configValidator.get('alternatives', 'target_list')
+    self.assertEquals(names, '')
+    self.assertEquals(targets, '')
+
+    paths = configValidator.get('directories', 'dir_list')
+    self.assertEquals(paths, '')
+
+    procs = configValidator.get('processes', 'proc_list')
+    self.assertEquals(procs, '')
+
+    pkgs = configValidator.get('packages', 'pkg_list')
+    self.assertEquals(pkgs, '')
+
+    repos = configValidator.get('repositories', 'repo_list')
+    self.assertEquals(repos, '')
+
+    time = configValidator.get('metadata', 'created')
+    self.assertTrue(time != None)
+
+  def test_write_host_check_report(self):
+    tmpfile = tempfile.mktemp()
+
+    config = ConfigParser.RawConfigParser()
+    config.add_section('agent')
+    config.set('agent', 'prefix', os.path.dirname(tmpfile))
+
+    handler = HostCheckReportFileHandler(config)
+
+    dict = {}
+    dict['hostHealth'] = {}
+    dict['existingUsers'] = [{'name':'user1', 'homeDir':'/var/log', 'status':'Exists'}]
+    dict['alternatives'] = [
+      {'name':'/etc/alternatives/hadoop-conf', 'target':'/etc/hadoop/conf.dist'},
+      {'name':'/etc/alternatives/hbase-conf', 'target':'/etc/hbase/conf.1'}
+    ]
+    dict['stackFoldersAndFiles'] = [{'name':'/a/b', 'type':'directory'},{'name':'/a/b.txt', 'type':'file'}]
+    dict['hostHealth']['activeJavaProcs'] = [
+      {'pid':355,'hadoop':True,'command':'some command','user':'root'},
+      {'pid':455,'hadoop':True,'command':'some command','user':'hdfs'}
+    ]
+    dict['installedPackages'] = [
+      {'name':'hadoop','version':'3.2.3','repoName':'HDP'},
+      {'name':'hadoop-lib','version':'3.2.3','repoName':'HDP'}
+    ]
+    dict['existingRepos'] = ['HDP', 'HDP-epel']
+    handler.writeHostCheckFile(dict)
+
+    configValidator = ConfigParser.RawConfigParser()
+    configPath = os.path.join(os.path.dirname(tmpfile), HostCheckReportFileHandler.HOST_CHECK_FILE)
+    configValidator.read(configPath)
+    users = configValidator.get('users', 'usr_list')
+    self.assertEquals(users, 'user1')
+
+    names = configValidator.get('alternatives', 'symlink_list')
+    targets = configValidator.get('alternatives', 'target_list')
+    self.chkItemsEqual(names, ['/etc/alternatives/hadoop-conf', '/etc/alternatives/hbase-conf'])
+    self.chkItemsEqual(targets, ['/etc/hadoop/conf.dist','/etc/hbase/conf.1'])
+
+    paths = configValidator.get('directories', 'dir_list')
+    self.chkItemsEqual(paths, ['/a/b','/a/b.txt'])
+
+    procs = configValidator.get('processes', 'proc_list')
+    self.chkItemsEqual(procs, ['455', '355'])
+
+    pkgs = configValidator.get('packages', 'pkg_list')
+    self.chkItemsEqual(pkgs, ['hadoop', 'hadoop-lib'])
+
+    repos = configValidator.get('repositories', 'repo_list')
+    self.chkItemsEqual(repos, ['HDP', 'HDP-epel'])
+
+    time = configValidator.get('metadata', 'created')
+    self.assertTrue(time != None)
+
+  def chkItemsEqual(self, commaDelimited, items):
+    items1 = commaDelimited.split(',')
+    items1.sort()
+    items.sort()
+    items1Str = ','.join(items1)
+    items2Str = ','.join(items)
+    self.assertEquals(items1Str, items2Str)
+
+if __name__ == "__main__":
+  unittest.main(verbosity=2)

+ 284 - 0
ambari-agent/src/test/python/TestHostInfo.py

@@ -0,0 +1,284 @@
+#!/usr/bin/env python2.6
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+import logging
+import unittest
+from mock.mock import patch
+from mock.mock import MagicMock
+from mock.mock import create_autospec
+from ambari_agent.HostCheckReportFileHandler import HostCheckReportFileHandler
+from ambari_agent.PackagesAnalyzer import PackagesAnalyzer
+from ambari_agent.HostInfo import HostInfo
+
+class TestHostInfo(TestCase):
+
+  logger = logging.getLogger()
+
+  @patch.object(PackagesAnalyzer, 'hasZypper')
+  @patch.object(PackagesAnalyzer, 'subprocessWithTimeout')
+  def test_analyze_zypper_out(self, spwt_mock, hasZy_mock):
+    packageAnalyzer = PackagesAnalyzer()
+    stringToRead = """Refreshing service 'susecloud'.
+           Loading repository data...
+           Reading installed packages...
+
+           S | Name                              | Type    | Version                | Arch   | Repository
+           --+-----------------------------------+---------+------------------------+--------+----------------------
+           i | ConsoleKit                        | package | 0.2.10-64.65.1         | x86_64 | SLES11-SP1-Updates
+           i | gweb                              | package | 2.2.0-99               | noarch | Hortonworks Data Platform Utils Version - HDP-UTILS-1.1.0.15
+           i | hadoop                            | package | 1.2.0.1.3.0.0-107      | x86_64 | HDP
+           i | hadoop-libhdfs                    | package | 1.2.0.1.3.0.0-107      | x86_64 | HDP
+           i | ambari-server                     | package | 1.2.4.9-1              | noarch | Ambari 1.x
+           i | hdp_mon_ganglia_addons            | package | 1.2.4.9-1              | noarch | Ambari 1.x
+           i | Minimal                           | pattern | 11-38.13.9             | x86_64 | SLES11-SP1"""
+    result = {}
+    result['out'] = stringToRead
+    result['err'] = ""
+    result['retCode'] = 0
+
+    spwt_mock.return_value = result
+    hasZy_mock.return_value = True
+    installedPackages = []
+    packageAnalyzer.allInstalledPackages(installedPackages)
+    self.assertEqual(7, len(installedPackages))
+    self.assertTrue(installedPackages[1][0], "gweb")
+    self.assertTrue(installedPackages[3][2], "HDP")
+    self.assertTrue(installedPackages[6][1], "11-38.13.9")
+
+  def test_getReposToRemove(self):
+    l1 = ["Hortonworks Data Platform Utils Version - HDP-UTILS-1.1.0.15", "Ambari 1.x", "HDP"]
+    l2 = ["Ambari", "HDP-UTIL"]
+    hostInfo = HostInfo()
+    l3 = hostInfo.getReposToRemove(l1, l2)
+    self.assertTrue(1, len(l3))
+    self.assertEqual(l3[0], "HDP")
+
+    l1 = ["AMBARI.dev-1.x", "HDP-1.3.0"]
+    l3 = hostInfo.getReposToRemove(l1, l2)
+    self.assertTrue(1, len(l3))
+    self.assertEqual(l3[0], "HDP-1.3.0")
+
+  def test_perform_package_analysis(self):
+    packageAnalyzer = PackagesAnalyzer()
+    installedPackages = [
+      ["hadoop-a", "2.3", "HDP"], ["zk", "3.1", "HDP"], ["webhcat", "3.1", "HDP"],
+      ["hadoop-b", "2.3", "HDP-epel"], ["epel", "3.1", "HDP-epel"], ["epel-2", "3.1", "HDP-epel"],
+      ["hadoop-c", "2.3", "Ambari"], ["ambari-s", "3.1", "Ambari"],
+      ["nagios", "2.3", "NAGIOS"], ["rrd", "3.1", "RRD"],
+      ["keeper-1", "2.3", "NAGIOS"], ["keeper-2", "3.1", "base"],["def-def.x86", "2.2", "DEF.3"],
+      ["def.1", "1.2", "NewDEF"]
+    ]
+    availablePackages = [
+      ["hadoop-d", "2.3", "HDP"], ["zk-2", "3.1", "HDP"], ["pig", "3.1", "HDP"],
+      ["epel-3", "2.3", "HDP-epel"], ["hadoop-e", "3.1", "HDP-epel"],
+      ["ambari-a", "3.1", "Ambari"],
+      ["keeper-3", "3.1", "base"]
+    ]
+
+    packagesToLook = ["webhcat", "hadoop", "*-def"]
+    reposToIgnore = ["ambari"]
+    additionalPackages = ["nagios", "rrd"]
+
+    repos = []
+    packageAnalyzer.getInstalledRepos(packagesToLook, installedPackages + availablePackages, reposToIgnore, repos)
+    self.assertEqual(3, len(repos))
+    expected = ["HDP", "HDP-epel", "DEF.3"]
+    for repo in expected:
+      self.assertTrue(repo in repos)
+
+    packagesInstalled = packageAnalyzer.getInstalledPkgsByRepo(repos, installedPackages)
+    self.assertEqual(7, len(packagesInstalled))
+    expected = ["hadoop-a", "zk", "webhcat", "hadoop-b", "epel", "epel-2", "def-def.x86"]
+    for repo in expected:
+      self.assertTrue(repo in packagesInstalled)
+
+    additionalPkgsInstalled = packageAnalyzer.getInstalledPkgsByNames(
+        additionalPackages, installedPackages)
+    self.assertEqual(2, len(additionalPkgsInstalled))
+    expected = ["nagios", "rrd"]
+    for additionalPkg in expected:
+      self.assertTrue(additionalPkg in additionalPkgsInstalled)
+
+    allPackages = list(set(packagesInstalled + additionalPkgsInstalled))
+    self.assertEqual(9, len(allPackages))
+    expected = ["hadoop-a", "zk", "webhcat", "hadoop-b", "epel", "epel-2", "nagios", "rrd", "def-def.x86"]
+    for package in expected:
+      self.assertTrue(package in allPackages)
+
+  @patch.object(PackagesAnalyzer, 'hasZypper')
+  @patch.object(PackagesAnalyzer, 'subprocessWithTimeout')
+  def test_analyze_yum_output(self, subprocessWithTimeout_mock, hasZy_mock):
+    packageAnalyzer = PackagesAnalyzer()
+    stringToRead = """Loaded plugins: amazon-id, product-id, rhui-lb, security, subscription-manager
+                      Updating certificate-based repositories.
+                      Installed Packages
+                      AMBARI.dev.noarch             1.x-1.el6             installed
+                      PyXML.x86_64                  0.8.4-19.el6          @koji-override-0
+                      Red_Hat_Enterprise_Linux-Release_Notes-6-en-US.noarch
+                              3-7.el6               @koji-override-0
+                      hcatalog.noarch               0.11.0.1.3.0.0-107.el6
+                                                    @HDP-1.3.0
+                      hesiod.x86_64                 3.1.0-19.el6          @koji-override-0/$releasever
+                      hive.noarch                   0.11.0.1.3.0.0-107.el6
+                                                    @HDP-1.3.0
+                      oracle-server-db.x86          1.3.17-2
+                                                    @Oracle-11g
+                      ambari-log4j.noarch           1.2.5.9-1             @AMBARI.dev-1.x"""
+    result = {}
+    result['out'] = stringToRead
+    result['err'] = ""
+    result['retCode'] = 0
+
+    subprocessWithTimeout_mock.return_value = result
+    hasZy_mock.return_value = False
+    installedPackages = []
+    packageAnalyzer.allInstalledPackages(installedPackages)
+    self.assertEqual(8, len(installedPackages))
+    for package in installedPackages:
+      self.assertTrue(package[0] in ["AMBARI.dev.noarch", "PyXML.x86_64", "oracle-server-db.x86",
+                                 "Red_Hat_Enterprise_Linux-Release_Notes-6-en-US.noarch",
+                                 "hcatalog.noarch", "hesiod.x86_64", "hive.noarch", "ambari-log4j.noarch"])
+      self.assertTrue(package[1] in ["1.x-1.el6", "0.8.4-19.el6", "3-7.el6", "3.1.0-19.el6",
+                                 "0.11.0.1.3.0.0-107.el6", "1.2.5.9-1", "1.3.17-2"])
+      self.assertTrue(package[2] in ["installed", "koji-override-0", "HDP-1.3.0",
+                                 "koji-override-0/$releasever", "AMBARI.dev-1.x", "Oracle-11g"])
+
+    packages = packageAnalyzer.getInstalledPkgsByNames(["AMBARI", "Red_Hat_Enterprise", "hesiod", "hive"],
+                                                       installedPackages)
+    self.assertEqual(4, len(packages))
+    expected = ["AMBARI.dev.noarch", "Red_Hat_Enterprise_Linux-Release_Notes-6-en-US.noarch",
+                                "hesiod.x86_64", "hive.noarch"]
+    for package in expected:
+      self.assertTrue(package in packages)
+
+    detailedPackages = packageAnalyzer.getPackageDetails(installedPackages, packages)
+    self.assertEqual(4, len(detailedPackages))
+    for package in detailedPackages:
+      self.assertTrue(package['version'] in ["1.x-1.el6", "3-7.el6", "3.1.0-19.el6",
+                                            "0.11.0.1.3.0.0-107.el6"])
+      self.assertTrue(package['repoName'] in ["installed", "koji-override-0", "HDP-1.3.0",
+                                              "koji-override-0/$releasever"])
+      self.assertFalse(package['repoName'] in ["AMBARI.dev-1.x"])
+
+  @patch.object(PackagesAnalyzer, 'subprocessWithTimeout')
+  def test_analyze_yum_output_err(self, subprocessWithTimeout_mock):
+    packageAnalyzer = PackagesAnalyzer()
+
+    result = {}
+    result['out'] = ""
+    result['err'] = ""
+    result['retCode'] = 1
+
+    subprocessWithTimeout_mock.return_value = result
+    installedPackages = []
+    packageAnalyzer.allInstalledPackages(installedPackages)
+    self.assertEqual(installedPackages, [])
+
+  @patch('os.path.exists')
+  def test_checkFolders(self, path_mock):
+    path_mock.return_value = True
+    hostInfo = HostInfo()
+    results = []
+    hostInfo.checkFolders(["/etc/conf", "/var/lib"], ["a1", "b1"], results)
+    self.assertEqual(4, len(results))
+    names = [i['name'] for i in results]
+    for item in ['/etc/conf/a1', '/var/lib/a1', '/etc/conf/b1', '/var/lib/b1']:
+      self.assertTrue(item in names)
+
+  @patch('os.path.exists')
+  @patch('__builtin__.open')
+  def test_checkUsers(self, builtins_open_mock, path_mock):
+    builtins_open_mock.return_value = [
+      "hdfs:x:493:502:Hadoop HDFS:/usr/lib/hadoop:/bin/bash",
+      "zookeeper:x:492:502:ZooKeeper:/var/run/zookeeper:/bin/bash"]
+    path_mock.return_value = True
+
+    hostInfo = HostInfo()
+    results = []
+    hostInfo.checkUsers(["zookeeper", "hdfs"], results)
+    self.assertEqual(2, len(results))
+    newlist = sorted(results, key=lambda k: k['name'])
+    self.assertTrue(newlist[0]['name'], "hdfs")
+    self.assertTrue(newlist[1]['name'], "zookeeper")
+    self.assertTrue(newlist[0]['homeDir'], "/usr/lib/hadoop")
+    self.assertTrue(newlist[1]['homeDir'], "/var/run/zookeeper")
+
+  @patch.object(HostInfo, 'osdiskAvailableSpace')
+  @patch.object(HostCheckReportFileHandler, 'writeHostCheckFile')
+  @patch.object(PackagesAnalyzer, 'allAvailablePackages')
+  @patch.object(PackagesAnalyzer, 'allInstalledPackages')
+  @patch.object(PackagesAnalyzer, 'getPackageDetails')
+  @patch.object(PackagesAnalyzer, 'getInstalledPkgsByNames')
+  @patch.object(PackagesAnalyzer, 'getInstalledPkgsByRepo')
+  @patch.object(PackagesAnalyzer, 'getInstalledRepos')
+  @patch.object(HostInfo, 'checkUsers')
+  @patch.object(HostInfo, 'checkLiveServices')
+  @patch.object(HostInfo, 'javaProcs')
+  @patch.object(HostInfo, 'checkFolders')
+  @patch.object(HostInfo, 'etcAlternativesConf')
+  @patch.object(HostInfo, 'hadoopVarRunCount')
+  @patch.object(HostInfo, 'hadoopVarLogCount')
+  def test_hostinfo_register(self, hvlc_mock, hvrc_mock, eac_mock, cf_mock, jp_mock,
+                             cls_mock, cu_mock, gir_mock, gipbr_mock, gipbn_mock,
+                             gpd_mock, aip_mock, aap_mock, whcf_mock, odas_mock):
+    hvlc_mock.return_value = 1
+    hvrc_mock.return_value = 1
+    gipbr_mock.return_value = ["pkg1"]
+    gipbn_mock.return_value = ["pkg2"]
+    gpd_mock.return_value = ["pkg1", "pkg2"]
+    odas_mock.return_value = [{'name':'name1'}]
+
+    hostInfo = HostInfo()
+    dict = {}
+    hostInfo.register(dict, True, True)
+    self.verifyReturnedValues(dict)
+
+    hostInfo.register(dict, True, False)
+    self.verifyReturnedValues(dict)
+
+    hostInfo.register(dict, False, True)
+    self.verifyReturnedValues(dict)
+
+    hostInfo = HostInfo()
+    dict = {}
+    hostInfo.register(dict, False, False)
+    self.assertTrue(gir_mock.called)
+    self.assertTrue(gpd_mock.called)
+    self.assertTrue(aip_mock.called)
+    self.assertTrue(odas_mock.called)
+
+    for existingPkg in ["pkg1", "pkg2"]:
+      self.assertTrue(existingPkg in dict['installedPackages'])
+    args, kwargs = gpd_mock.call_args_list[0]
+    for existingPkg in ["pkg1", "pkg2"]:
+      self.assertTrue(existingPkg in args[1])
+
+  def verifyReturnedValues(self, dict):
+    hostInfo = HostInfo()
+    self.assertEqual(dict['alternatives'], [])
+    self.assertEqual(dict['stackFoldersAndFiles'], [])
+    self.assertEqual(dict['existingUsers'], [])
+    self.assertEqual(dict['existingRepos'][0], hostInfo.RESULT_UNAVAILABLE)
+    self.assertEqual(dict['installedPackages'], [])
+    self.assertEqual(1, len(dict['hostHealth']['diskStatus']))
+
+if __name__ == "__main__":
+  unittest.main(verbosity=2)

+ 225 - 57
ambari-server/src/main/java/org/apache/ambari/server/agent/AgentEnv.java

@@ -29,96 +29,166 @@ public class AgentEnv {
   /**
    * Various directories, configurable in <code>ambari-agent.ini</code>
    */
-  private Directory[] paths = new Directory[0];
+  private Directory[] stackFoldersAndFiles = new Directory[0];
 
-  /**
-   * Java processes running on the system.  Default empty array.
-   */
-  private JavaProc[] javaProcs = new JavaProc[0];
-  
   /**
    * Various RPM package versions.
    */
   private Rpm[] rpms = new Rpm[0];
-  
+
   /**
-   * Number of pid files found in <code>/var/run/hadoop</code>
+   * Directories that match name <code>/etc/alternatives/*conf</code>
    */
-  private int varRunHadoopPidCount = 0;
-  
+  private Alternative[] alternatives = new Alternative[0];
+
   /**
-   * Number of log files found in <code>/var/log/hadoop</code>
+   * List of existing users
    */
-  private int varLogHadoopLogCount = 0;
+  private ExistingUser[] existingUsers = new ExistingUser[0];
 
   /**
-   * Directories that match name <code>/etc/alternatives/*conf</code>
+   * List of repos
    */
-  private Alternative[] etcAlternativesConf = new Alternative[0];
+  private String[] existingRepos = new String[0];
 
   /**
-   * Output for repo listing.  Command to do this varies, but for RHEL it is
-   * <code>yum -C repolist</code>
+   * List of packages
    */
-  private String repoInfo;
-  
+  private PackageDetail[] installedPackages = new PackageDetail[0];
 
-  public Directory[] getPaths() {
-      return paths;
+  /**
+   * The host health report
+   */
+  private HostHealth hostHealth = new HostHealth();
+
+  public Directory[] getStackFoldersAndFiles() {
+      return stackFoldersAndFiles;
   }
   
-  public void setPaths(Directory[] dirs) {
-    paths = dirs;
+  public void setStackFoldersAndFiles(Directory[] dirs) {
+    stackFoldersAndFiles = dirs;
   }
   
-  public void setVarRunHadoopPidCount(int count) {
-    varRunHadoopPidCount = count;
+  public void setRpms(Rpm[] rpm) {
+    rpms = rpm;
   }
   
-  public int getVarRunHadoopPidCount() {
-    return varRunHadoopPidCount;
+  public Rpm[] getRpms() {
+    return rpms;
   }
   
-  public void setVarLogHadoopLogCount(int count) {
-    varLogHadoopLogCount = count;
+  public void setExistingUsers(ExistingUser[] users) {
+    existingUsers = users;
   }
-  
-  public int getVarLogHadoopLogCount() {
-    return varLogHadoopLogCount;
+
+  public ExistingUser[] getExistingUsers() {
+    return existingUsers;
   }
-  
-  public void setJavaProcs(JavaProc[] procs) {
-    javaProcs = procs;
+
+  public void setAlternatives(Alternative[] dirs) {
+    alternatives = dirs;
   }
-  
-  public JavaProc[] getJavaProcs() {
-    return javaProcs;
+
+  public Alternative[] getAlternatives() {
+    return alternatives;
   }
-  
-  public void setRpms(Rpm[] rpm) {
-    rpms = rpm;
+
+  public void setExistingRepos(String[] repos) {
+    existingRepos = repos;
   }
-  
-  public Rpm[] getRpms() {
-    return rpms;
+
+  public String[] getExistingRepos() {
+    return existingRepos;
   }
-  
-  public void setEtcAlternativesConf(Alternative[] dirs) {
-    etcAlternativesConf = dirs;
+
+  public void setInstalledPackages(PackageDetail[] packages) {
+    installedPackages = packages;
   }
-  
-  public Alternative[] getEtcAlternativesConf() {
-    return etcAlternativesConf;
+
+  public PackageDetail[] getInstalledPackages() {
+    return installedPackages;
   }
-  
-  public void setRepoInfo(String info) {
-    repoInfo = info;
+
+  public void setHostHealth(HostHealth healthReport) {
+    hostHealth = healthReport;
   }
-  
-  public String getRepoInfo() {
-    return repoInfo;
+
+  public HostHealth getHostHealth() {
+    return hostHealth;
   }
-  
+
+  public static class HostHealth {
+    /**
+     * Java processes running on the system.  Default empty array.
+     */
+    @SerializedName("activeJavaProcs")
+    private JavaProc[] activeJavaProcs = new JavaProc[0];
+
+    /**
+     * The current time when agent send the host check report
+     */
+    @SerializedName("agentTimeStampAtReporting")
+    private long agentTimeStampAtReporting = 0;
+
+    /**
+     * The current time when host check report was received
+     */
+    @SerializedName("serverTimeStampAtReporting")
+    private long serverTimeStampAtReporting = 0;
+
+    /**
+     * Live services running on the agent
+     */
+    @SerializedName("liveServices")
+    private LiveService[] liveServices = new LiveService[0];
+
+    /**
+     * The available space in the root disk
+     */
+    @SerializedName("diskStatus")
+    private DiskInfo[] diskStatus = new DiskInfo[0];
+
+    public void setAgentTimeStampAtReporting(long currentTime) {
+      agentTimeStampAtReporting = currentTime;
+    }
+
+    public long getAgentTimeStampAtReporting() {
+      return agentTimeStampAtReporting;
+    }
+
+    public void setServerTimeStampAtReporting(long currentTime) {
+      serverTimeStampAtReporting = currentTime;
+    }
+
+    public long getServerTimeStampAtReporting() {
+      return serverTimeStampAtReporting;
+    }
+
+    public void setActiveJavaProcs(JavaProc[] procs) {
+      activeJavaProcs = procs;
+    }
+
+    public JavaProc[] getActiveJavaProcs() {
+      return activeJavaProcs;
+    }
+
+    public void setLiveServices(LiveService[] services) {
+      liveServices = services;
+    }
+
+    public LiveService[] getLiveServices() {
+      return liveServices;
+    }
+
+    public void setDiskStatus(DiskInfo[] diskInfo) {
+      diskStatus = diskInfo;
+    }
+
+    public DiskInfo[] getDiskStatus() {
+      return diskStatus;
+    }
+  }
+
   /**
    * Represents information about rpm-installed packages
    */
@@ -155,6 +225,39 @@ public class AgentEnv {
       return rpmVersion;
     }
   }
+
+  public static class PackageDetail {
+    @SerializedName("name")
+    private String pkgName;
+    @SerializedName("version")
+    private String pkgVersion;
+    @SerializedName("repoName")
+    private String pkgRepoName;
+
+    public void setName(String name) {
+      pkgName = name;
+    }
+
+    public String getName() {
+      return pkgName;
+    }
+
+    public void setVersion(String version) {
+      pkgVersion = version;
+    }
+
+    public String getVersion() {
+      return pkgVersion;
+    }
+
+    public void setRepoName(String repoName) {
+      pkgRepoName = repoName;
+    }
+
+    public String getRepoName() {
+      return pkgRepoName;
+    }
+  }
   
   /**
    * Represents information about a directory of interest.
@@ -250,5 +353,70 @@ public class AgentEnv {
       return altTarget;
     }
   }
-  
+
+  public static class LiveService {
+    @SerializedName("name")
+    private String svcName;
+    @SerializedName("status")
+    private String svcStatus;
+    @SerializedName("desc")
+    private String svcDesc;
+
+    public void setName(String name) {
+      svcName = name;
+    }
+
+    public String getName() {
+      return svcName;
+    }
+
+    public void setStatus(String status) {
+      svcStatus = status;
+    }
+
+    public String getStatus() {
+      return svcStatus;
+    }
+
+    public void setDesc(String desc) {
+      svcDesc = desc;
+    }
+
+    public String getDesc() {
+      return svcDesc;
+    }
+  }
+
+  public static class ExistingUser {
+    @SerializedName("name")
+    private String name;
+    @SerializedName("homeDir")
+    private String homeDir;
+    @SerializedName("status")
+    private String status;
+
+    public void setUserName(String userName) {
+      name = userName;
+    }
+
+    public String getUserName() {
+      return name;
+    }
+
+    public void setUserHomeDir(String userHomeDir) {
+      homeDir = userHomeDir;
+    }
+
+    public String getUserHomeDir() {
+      return homeDir;
+    }
+
+    public void setUserStatus(String userStatus) {
+      status = userStatus;
+    }
+
+    public String getUserStatus() {
+      return status;
+    }
+  }
 }

+ 22 - 1
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java

@@ -97,6 +97,10 @@ public class HeartBeatHandler {
 
   public HeartBeatResponse handleHeartBeat(HeartBeat heartbeat)
       throws AmbariException {
+    long now = System.currentTimeMillis();
+    if(heartbeat.getAgentEnv() != null && heartbeat.getAgentEnv().getHostHealth() != null) {
+      heartbeat.getAgentEnv().getHostHealth().setServerTimeStampAtReporting(now);
+    }
     String hostname = heartbeat.getHostname();
     Long currentResponseId = hostResponseIds.get(hostname);
     HeartBeatResponse response;
@@ -133,7 +137,6 @@ public class HeartBeatHandler {
     hostResponseIds.put(hostname, currentResponseId);
     hostResponses.put(hostname, response);
 
-    long now = System.currentTimeMillis();
     HostState hostState = hostObject.getState();
     // If the host is waiting for component status updates, notify it
     if (heartbeat.componentStatus.size() > 0
@@ -170,6 +173,7 @@ public class HeartBeatHandler {
     // Send commands if node is active
     if (hostObject.getState().equals(HostState.HEALTHY)) {
       sendCommands(hostname, response);
+      annotateResponse(hostname, response);
     }
     return response;
   }
@@ -453,4 +457,21 @@ public class HeartBeatHandler {
     response.setResponseId(requestId);
     return response;
   }
+
+  /**
+   * Annotate the response with some housekeeping details.
+   * hasMappedComponents - indicates if any components are mapped to the host
+   * @param hostname
+   * @param response
+   * @throws AmbariException
+   */
+  private void annotateResponse(String hostname, HeartBeatResponse response) throws AmbariException {
+    for (Cluster cl : this.clusterFsm.getClustersForHost(hostname)) {
+      List<ServiceComponentHost> scHosts = cl.getServiceComponentHosts(hostname);
+      if (scHosts != null && scHosts.size() > 0) {
+        response.setHasMappedComponents(true);
+        break;
+      }
+    }
+  }
 }

+ 11 - 0
ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java

@@ -38,6 +38,7 @@ public class HeartBeatResponse {
   RegistrationCommand registrationCommand;
 
   boolean restartAgent = false;
+  boolean hasMappedComponents = false;
 
   @JsonProperty("responseId")
   public long getResponseId() {
@@ -89,6 +90,16 @@ public class HeartBeatResponse {
     this.restartAgent = restartAgent;
   }
 
+  @JsonProperty("hasMappedComponents")
+  public boolean hasMappedComponents() {
+    return hasMappedComponents;
+  }
+
+  @JsonProperty("hasMappedComponents")
+  public void setHasMappedComponents(boolean hasMappedComponents) {
+    this.hasMappedComponents = hasMappedComponents;
+  }
+
   public void addExecutionCommand(ExecutionCommand execCmd) {
     executionCommands.add(execCmd);
   }

+ 129 - 53
ambari-server/src/test/java/org/apache/ambari/server/agent/AgentResourceTest.java

@@ -24,8 +24,11 @@ import static org.mockito.Mockito.when;
 
 import javax.ws.rs.core.MediaType;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.persist.jpa.JpaPersistModule;
+
 import junit.framework.Assert;
 
 import org.apache.ambari.server.actionmanager.ActionManager;
@@ -33,7 +36,6 @@ import org.apache.ambari.server.actionmanager.HostRoleCommandFactory;
 import org.apache.ambari.server.actionmanager.StageFactory;
 import org.apache.ambari.server.agent.rest.AgentResource;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
-import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.state.*;
 import org.apache.ambari.server.state.cluster.ClusterFactory;
 import org.apache.ambari.server.state.cluster.ClusterImpl;
@@ -63,10 +65,10 @@ import com.sun.jersey.test.framework.WebAppDescriptor;
 public class AgentResourceTest extends JerseyTest {
   static String PACKAGE_NAME = "org.apache.ambari.server.agent.rest";
   private static Log LOG = LogFactory.getLog(AgentResourceTest.class);
+  protected Client client;
   HeartBeatHandler handler;
   ActionManager actionManager;
   Injector injector;
-  protected Client client;
   AmbariMetaInfo ambariMetaInfo;
 
   public AgentResourceTest() {
@@ -75,6 +77,131 @@ public class AgentResourceTest extends JerseyTest {
         .build());
   }
 
+  public static <T> T getJsonFormString(String json, Class<T> type) {
+    GsonBuilder gsonBuilder = new GsonBuilder();
+    gsonBuilder.serializeNulls();
+    Gson gson = gsonBuilder.create();
+    return (T) gson.fromJson(json, type);
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    handler = mock(HeartBeatHandler.class);
+    injector = Guice.createInjector(new MockModule());
+    injector.injectMembers(handler);
+  }
+
+  private JSONObject createDummyJSONRegister() throws JSONException {
+    JSONObject json = new JSONObject();
+    json.put("responseId", -1);
+    json.put("timestamp", System.currentTimeMillis());
+    json.put("hostname", "dummyHost");
+    return json;
+  }
+
+  private JSONObject createDummyHeartBeat() throws JSONException {
+    JSONObject json = new JSONObject();
+    json.put("responseId", -1);
+    json.put("timestamp", System.currentTimeMillis());
+    json.put("hostname", "dummyHost");
+    return json;
+  }
+
+  private JSONObject createDummyHeartBeatWithAgentEnv() throws JSONException {
+    JSONObject json = new JSONObject();
+    json.put("responseId", -1);
+    json.put("timestamp", System.currentTimeMillis());
+    json.put("hostname", "dummyHost");
+
+    JSONObject agentEnv = new JSONObject();
+    json.put("agentEnv", agentEnv);
+    return json;
+  }
+
+  @Test
+  public void agentRegistration() throws UniformInterfaceException, JSONException {
+    RegistrationResponse response;
+    ClientConfig clientConfig = new DefaultClientConfig();
+    clientConfig.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, Boolean.TRUE);
+    client = Client.create(clientConfig);
+    WebResource webResource = client.resource("http://localhost:9998/register/dummyhost");
+    response = webResource.type(MediaType.APPLICATION_JSON)
+        .post(RegistrationResponse.class, createDummyJSONRegister());
+    LOG.info("Returned from Server responce=" + response);
+    Assert.assertEquals(response.getResponseStatus(), RegistrationStatus.OK);
+  }
+
+  @Test
+  public void agentHeartBeat() throws UniformInterfaceException, JSONException {
+    HeartBeatResponse response;
+    ClientConfig clientConfig = new DefaultClientConfig();
+    clientConfig.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, Boolean.TRUE);
+    client = Client.create(clientConfig);
+    WebResource webResource = client.resource("http://localhost:9998/heartbeat/dummyhost");
+    response = webResource.type(MediaType.APPLICATION_JSON)
+        .post(HeartBeatResponse.class, createDummyHeartBeat());
+    LOG.info("Returned from Server: "
+        + " response=" + response);
+    Assert.assertEquals(response.getResponseId(), 0L);
+  }
+
+  @Test
+  public void agentHeartBeatWithEnv() throws UniformInterfaceException, JSONException {
+    HeartBeatResponse response;
+    ClientConfig clientConfig = new DefaultClientConfig();
+    clientConfig.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, Boolean.TRUE);
+    client = Client.create(clientConfig);
+    WebResource webResource = client.resource("http://localhost:9998/heartbeat/dummyhost");
+    response = webResource.type(MediaType.APPLICATION_JSON)
+        .post(HeartBeatResponse.class, createDummyHeartBeatWithAgentEnv());
+    LOG.info("Returned from Server: "
+        + " response=" + response);
+    Assert.assertEquals(response.getResponseId(), 0L);
+  }
+
+  @Test
+  public void deserializeClasses() {
+    AgentEnv.Directory[] dirs = getJsonFormString(
+        "[{name:'/var/lib', type:'directory'},{name:'b', type:'directory'}]",
+        AgentEnv.Directory[].class);
+    Assert.assertEquals("/var/lib", dirs[0].getName());
+    Assert.assertEquals("directory", dirs[1].getType());
+
+    AgentEnv.PackageDetail[] pkgs = getJsonFormString(
+        "[{name:'abc', version:'2.3', repoName:'HDP'},{name:'abc', version:'3.3', repoName:'HDP-epel'}]",
+        AgentEnv.PackageDetail[].class);
+    Assert.assertEquals("abc", pkgs[0].getName());
+    Assert.assertEquals("HDP", pkgs[0].getRepoName());
+    Assert.assertEquals("3.3", pkgs[1].getVersion());
+
+    AgentEnv.ExistingUser[] users = getJsonFormString(
+        "[{name:'hdfs', homeDir:'/var/lib/hadoop', status:''}, " +
+            "{name:'ambari_qa', homeDir:'/var/home/ambari_qa',status:'None'}]",
+        AgentEnv.ExistingUser[].class);
+    Assert.assertEquals("hdfs", users[0].getUserName());
+    Assert.assertEquals("/var/lib/hadoop", users[0].getUserHomeDir());
+    Assert.assertEquals("None", users[1].getUserStatus());
+
+    AgentEnv.JavaProc[] procs = getJsonFormString(
+        "[{user:'root', pid:'355', hadoop:'True', command:'cmd'}, " +
+            "{user:'hdfs', pid:'325', hadoop:'False', command:'cmd = 2'}]",
+        AgentEnv.JavaProc[].class);
+    Assert.assertEquals("root", procs[0].getUser());
+    Assert.assertEquals(355, procs[0].getPid());
+    Assert.assertEquals("cmd = 2", procs[1].getCommand());
+    Assert.assertEquals(false, procs[1].isHadoop());
+
+    AgentEnv.Alternative[] alternatives = getJsonFormString(
+        "[{name:'/etc/alternatives/hdfs-conf', target:'/etc/hadoop/conf.dist'}, " +
+            "{name:'abc', target:'def'}]",
+        AgentEnv.Alternative[].class);
+    Assert.assertEquals("/etc/alternatives/hdfs-conf", alternatives[0].getName());
+    Assert.assertEquals("/etc/hadoop/conf.dist", alternatives[0].getTarget());
+    Assert.assertEquals("abc", alternatives[1].getName());
+    Assert.assertEquals("def", alternatives[1].getTarget());
+  }
+
   public class MockModule extends AbstractModule {
 
     RegistrationResponse response = new RegistrationResponse();
@@ -125,55 +252,4 @@ public class AgentResourceTest extends JerseyTest {
       install(new FactoryModuleBuilder().build(HostRoleCommandFactory.class));
     }
   }
-
-  @Override
-  public void setUp() throws Exception {
-    super.setUp();
-    handler = mock(HeartBeatHandler.class);
-    injector = Guice.createInjector(new MockModule());
-    injector.injectMembers(handler);
-  }
-
-  private JSONObject createDummyJSONRegister() throws JSONException {
-    JSONObject json = new JSONObject();
-    json.put("responseId" , -1);
-    json.put("timestamp" , System.currentTimeMillis());
-    json.put("hostname",   "dummyHost");
-    return json;
-  }
-
-  private JSONObject createDummyHeartBeat() throws JSONException {
-    JSONObject json = new JSONObject();
-    json.put("responseId", -1);
-    json.put("timestamp" , System.currentTimeMillis());
-    json.put("hostname", "dummyHost");
-    return json;
-  }
-
-  @Test
-  public void agentRegistration() throws UniformInterfaceException, JSONException {
-    RegistrationResponse response;
-    ClientConfig clientConfig = new DefaultClientConfig();
-    clientConfig.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, Boolean.TRUE);
-    client = Client.create(clientConfig);
-    WebResource webResource = client.resource("http://localhost:9998/register/dummyhost");
-    response = webResource.type(MediaType.APPLICATION_JSON)
-      .post(RegistrationResponse.class, createDummyJSONRegister());
-    LOG.info("Returned from Server responce=" + response);
-    Assert.assertEquals(response.getResponseStatus(), RegistrationStatus.OK);
-  }
-
-  @Test
-  public void agentHeartBeat() throws UniformInterfaceException, JSONException {
-    HeartBeatResponse response;
-    ClientConfig clientConfig = new DefaultClientConfig();
-    clientConfig.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, Boolean.TRUE);
-    client = Client.create(clientConfig);
-    WebResource webResource = client.resource("http://localhost:9998/heartbeat/dummyhost");
-    response = webResource.type(MediaType.APPLICATION_JSON)
-        .post(HeartBeatResponse.class, createDummyHeartBeat());
-    LOG.info("Returned from Server: "
-        + " response=" +   response);
-    Assert.assertEquals(response.getResponseId(), 0L);
-  }
 }

+ 51 - 0
ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java

@@ -289,6 +289,55 @@ public class TestHeartbeatHandler {
     assertEquals(State.STARTED, componentState3);
   }
 
+  @Test
+  public void testStatusHeartbeatWithAnnotation() throws Exception {
+    ActionManager am = getMockActionManager();
+
+    Cluster cluster = getDummyCluster();
+
+    @SuppressWarnings("serial")
+    Set<String> hostNames = new HashSet<String>(){{
+      add(DummyHostname1);
+    }};
+    clusters.mapHostsToCluster(hostNames, DummyCluster);
+    Service hdfs = cluster.addService(HDFS);
+    hdfs.persist();
+    hdfs.addServiceComponent(DATANODE).persist();
+    hdfs.addServiceComponent(NAMENODE).persist();
+    hdfs.addServiceComponent(SECONDARY_NAMENODE).persist();
+
+    ActionQueue aq = new ActionQueue();
+    HeartBeatHandler handler = getHeartBeatHandler(am, aq);
+
+    HeartBeat hb = new HeartBeat();
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(0);
+    hb.setHostname(DummyHostname1);
+    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
+    hb.setReports(new ArrayList<CommandReport>());
+    ArrayList<ComponentStatus> componentStatuses = new ArrayList<ComponentStatus>();
+    hb.setComponentStatus(componentStatuses);
+
+    HeartBeatResponse resp = handler.handleHeartBeat(hb);
+    Assert.assertFalse(resp.hasMappedComponents());
+
+    hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1).persist();
+    ServiceComponentHost serviceComponentHost1 = clusters.getCluster(DummyCluster).getService(HDFS).
+        getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1);
+    serviceComponentHost1.setState(State.INIT);
+
+    hb = new HeartBeat();
+    hb.setTimestamp(System.currentTimeMillis());
+    hb.setResponseId(1);
+    hb.setHostname(DummyHostname1);
+    hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
+    hb.setReports(new ArrayList<CommandReport>());
+    hb.setComponentStatus(componentStatuses);
+
+    resp = handler.handleHeartBeat(hb);
+    Assert.assertTrue(resp.hasMappedComponents());
+  }
+
   @Test
   public void testLiveStatusUpdateAfterStopFailed() throws Exception {
     ActionManager am = getMockActionManager();
@@ -989,6 +1038,7 @@ public class TestHeartbeatHandler {
     hb.setHostname(DummyHostname1);
     hb.setNodeStatus(new HostStatus(Status.HEALTHY, DummyHostStatus));
     hb.setReports(new ArrayList<CommandReport>());
+    hb.setAgentEnv(new AgentEnv());
 
     ArrayList<ComponentStatus> componentStatuses = new ArrayList<ComponentStatus>();
     ComponentStatus componentStatus1 = createComponentStatus(DummyCluster, HDFS, DummyHostStatus, State.STARTED,
@@ -1012,6 +1062,7 @@ public class TestHeartbeatHandler {
         stack122, serviceComponentHost2.getStackVersion());
     assertEquals("Matching value " + serviceComponentHost3.getStackVersion(),
         stack130, serviceComponentHost3.getStackVersion());
+    assertTrue(hb.getAgentEnv().getHostHealth().getServerTimeStampAtReporting() >= hb.getTimestamp());
   }
 
   @Test

+ 2 - 3
ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java

@@ -173,9 +173,8 @@ public class ClusterTest {
     Directory dir2 = new Directory();
     dir2.setName("/var/log/hadoop");
     dir2.setType("not_exist");
-    agentEnv.setPaths(new Directory[] { dir1, dir2 });
-    
-    
+    agentEnv.setStackFoldersAndFiles(new Directory[] { dir1, dir2 });
+
     AgentVersion agentVersion = new AgentVersion("0.0.x");
     long currentTime = 1001;
 

+ 6 - 0
ambari-web/app/controllers/wizard/step3_controller.js

@@ -713,6 +713,12 @@ App.WizardStep3Controller = Em.Controller.extend({
         console.log("last_agent_env is missing for " + host.Hosts.host_name + ".  Skipping host check.");
         return;
       }
+      
+      // TODO - Remove when correct parsing code in place.
+      if (!host.Hosts.last_agent_env.paths) {
+        return;
+      }
+      
       host.Hosts.last_agent_env.paths.forEach(function(path){
         var parsedPath = {
           name: path.name,