Преглед изворни кода

AMBARI-1201. Improve Agent Registration and Heartbeat json. (Nate Cole via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/incubator/ambari/trunk@1436531 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar пре 12 година
родитељ
комит
18b019898c

+ 3 - 0
CHANGES.txt

@@ -130,6 +130,9 @@ Trunk (unreleased changes):
  AMBARI-1231. Replace sudo with su in the ambari setup script since ambari
  server setup is already run as root. (mahadev)
 
+ AMBARI-1201. Improve Agent Registration and Heartbeat json. (Nate Cole via
+ mahadev)
+
 AMBARI-1.2.0 branch:
 
  INCOMPATIBLE CHANGES

+ 10 - 0
ambari-agent/conf/unix/ambari-agent.ini

@@ -40,3 +40,13 @@ passphrase_env_var_name=AMBARI_PASSPHRASE
 
 [services]
 pidLookupPath=/var/run/
+
+[heartbeat]
+state_interval=6
+dirs=/etc/hadoop,/etc/hadoop/conf,/etc/hbase,/etc/hcatalog,/etc/hive,/etc/oozie,
+  /etc/sqoop,/etc/ganglia,/etc/nagios,
+  /var/run/hadoop,/var/run/zookeeper,/var/run/hbase,/var/run/templeton,/var/run/oozie,
+  /var/log/hadoop,/var/log/zookeeper,/var/log/hbase,/var/run/templeton,/var/log/hive,
+  /var/log/nagios
+rpms=yum,rpm,openssl,curl,wget,net-snmp,ntpd,ruby,puppet,nagios,ganglia,passenger,
+  hadoop,hbase,oozie,sqoop,pig,zookeeper,hive,libconfuse,postgresql,httpd,apache2,http-server

+ 5 - 0
ambari-agent/src/main/python/ambari_agent/AmbariConfig.py

@@ -53,6 +53,11 @@ sleepBetweenRetries=1
 keysdir=/tmp/ambari-agent
 server_crt=ca.crt
 passphrase_env_var_name=AMBARI_PASSPHRASE
+
+[heartbeat]
+state_interval = 6
+dirs=/etc/hadoop,/etc/hadoop/conf,/var/run/hadoop,/var/log/hadoop
+rpms=hadoop,openssl,wget,net-snmp,ntpd,ruby,ganglia,nagios
 """
 s = StringIO.StringIO(content)
 config.readfp(s)

+ 4 - 1
ambari-agent/src/main/python/ambari_agent/Controller.py

@@ -135,12 +135,15 @@ class Controller(threading.Thread):
     retry = False
     certVerifFailed = False
 
+    config = AmbariConfig.config
+    hb_interval = config.get('heartbeat', 'state_interval')
+
     #TODO make sure the response id is monotonically increasing
     id = 0
     while not self.DEBUG_STOP_HEARTBITTING:
       try:
         if not retry:
-          data = json.dumps(self.heartbeat.build(self.responseId))
+          data = json.dumps(self.heartbeat.build(self.responseId, int(hb_interval)))
           pass
         else:
           self.DEBUG_HEARTBEAT_RETRIES += 1

+ 1 - 1
ambari-agent/src/main/python/ambari_agent/Heartbeat.py

@@ -54,7 +54,7 @@ class Heartbeat:
                   'hostname'          : socket.getfqdn(),
                   'nodeStatus'        : nodeStatus
                 }
-    if (int(id) >= 0) and (int(id) % state_interval) == 0:
+    if (int(id) >= 0) and state_interval > 0 and (int(id) % state_interval) == 0:
       hostInfo = HostInfo()
       nodeInfo = { }
       # for now, just do the same work as registration

+ 92 - 39
ambari-agent/src/main/python/ambari_agent/HostInfo.py

@@ -21,6 +21,8 @@ limitations under the License.
 import os
 import glob
 import pwd
+import subprocess
+import AmbariConfig
 
 class HostInfo:
 
@@ -31,10 +33,33 @@ class HostInfo:
       return 'sym_link'
     elif os.path.isdir(path):
       return 'directory'
+    elif os.path.isfile(path):
+      return 'file'
     return 'unknown'
 
-  def hadoopDir(self):
-    return self.dirType('/etc/hadoop')
+  def rpmInfo(self, rpmList):
+    config = AmbariConfig.config
+
+    try:
+      for rpmName in config.get('heartbeat', 'rpms').split(','):
+        rpmName = rpmName.strip()
+        rpm = { }
+        rpm['name'] = rpmName
+
+        try:
+          osStat = subprocess.Popen(["rpm", "-q", rpmName], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+          out, err = osStat.communicate()
+          if (0 != osStat.returncode or 0 == len(out.strip())):
+            rpm['installed'] = False
+          else:
+            rpm['installed'] = True
+            rpm['version'] = out.strip()
+        except:
+          rpm['available'] = False
+
+        rpmList.append(rpm)
+    except:
+      pass
 
   def hadoopVarRunCount(self):
     if not os.path.exists('/var/run/hadoop'):
@@ -47,60 +72,88 @@ class HostInfo:
       return 0
     logs = glob.glob('/var/log/hadoop/*/*.log')
     return len(logs)
+  
+  def etcAlternativesConf(self, etcList):
+    if not os.path.exists('/etc/alternatives'):
+      return []
+    confs = glob.glob('/etc/alternatives/*conf')
+
+    for conf in confs:
+      confinfo = { }
+      realconf = conf
+      if os.path.islink(conf):
+        realconf = os.path.realpath(conf)
+      confinfo['name'] = conf
+      confinfo['target'] = realconf
+      etcList.append(confinfo)
+
+  def repos(self):
+    # centos, redhat
+    try:
+      osStat = subprocess.Popen(["yum", "-C", "repolist"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+      out, err = osStat.communicate()
+      return out
+    except:
+      pass
+    # suse, only if above failed
+    try:
+      osStat = subprocess.Popen(["zypper", "repos"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+      out, err = osStat.communicate()
+      return out
+    except:
+      pass
 
-  def dirHelper(self, dict, name, prefix):
-    dict[name] = self.dirType(os.path.join(prefix, name))
+    # default, never return empty
+    return "could_not_determine"
+    
 
   def register(self, dict):
     dict['varLogHadoopLogCount'] = self.hadoopVarLogCount()
     dict['varRunHadoopPidCount'] = self.hadoopVarRunCount()
-
-    etcDirs = { }
-    self.dirHelper(etcDirs, 'hadoop', '/etc')
-    etcDirs['hadoop_conf'] = self.dirType('/etc/hadoop/conf')
-    self.dirHelper(etcDirs, 'hbase', '/etc')
-    self.dirHelper(etcDirs, 'hcatalog', '/etc')
-    self.dirHelper(etcDirs, 'hive', '/etc')
-    self.dirHelper(etcDirs, 'oozie', '/etc')
-    self.dirHelper(etcDirs, 'sqoop', '/etc')
-    self.dirHelper(etcDirs, 'ganglia', '/etc')
-    self.dirHelper(etcDirs, 'nagios', '/etc')
-    dict['etcDirs'] = etcDirs
     
-    varRunDirs = { }
-    self.dirHelper(varRunDirs, 'hadoop', '/var/run')
-    self.dirHelper(varRunDirs, 'zookeeper', '/var/run')
-    self.dirHelper(varRunDirs, 'hbase', '/var/run')
-    self.dirHelper(varRunDirs, 'templeton', '/var/run')
-    self.dirHelper(varRunDirs, 'oozie', '/var/run')
-    dict['varRunDirs'] = varRunDirs
-
-    varLogDirs = { }
-    self.dirHelper(varLogDirs, 'hadoop', '/var/log')
-    self.dirHelper(varLogDirs, 'zookeeper', '/var/log')
-    self.dirHelper(varLogDirs, 'hbase', '/var/log')
-    self.dirHelper(varLogDirs, 'hive', '/var/log')
-    self.dirHelper(varLogDirs, 'templeton', '/var/log')
-    self.dirHelper(varLogDirs, 'nagios', '/var/log')
-    dict['varLogDirs'] = varLogDirs
+    etcs = []
+    self.etcAlternativesConf(etcs)
+    dict['etcAlternativesConf'] = etcs
+
+    dirs = []
+    config = AmbariConfig.config
+    try:
+      for dirName in config.get('heartbeat', 'dirs').split(','):
+        obj = { }
+        obj['type'] = self.dirType(dirName.strip())
+        obj['name'] = dirName.strip()
+        dirs.append(obj)
+    except:
+      pass
+
+    dict['paths'] = dirs
 
     java = []
-    self.hadoopJava(java)
-    dict['hadoopJavaProcs'] = java
+    self.javaProcs(java)
+    dict['javaProcs'] = java
+
+    rpms = []
+    self.rpmInfo(rpms)
+    dict['rpms'] = rpms
+
+    dict['repoInfo'] = self.repos()
     
-  def hadoopJava(self, list):
+  def javaProcs(self, list):
     try:
       pids = [pid for pid in os.listdir('/proc') if pid.isdigit()]
       for pid in pids:
         cmd = open(os.path.join('/proc', pid, 'cmdline'), 'rb').read()
-        if 'java' in cmd and 'hadoop' in cmd:
+        cmd = cmd.replace('\0', ' ')
+        if 'java' in cmd:
+          dict = { }
+          dict['pid'] = int(pid)
+          dict['hadoop'] = True if 'hadoop' in cmd else False
+          dict['command'] = cmd.strip()
           for line in open(os.path.join('/proc', pid, 'status')):
             if line.startswith('Uid:'):
               uid = int(line.split()[1])
-              dict = { }
               dict['user'] = pwd.getpwuid(uid).pw_name
-              dict['pid'] = int(pid)
-              list.append(dict)
+          list.append(dict)
     except:
       pass
     pass

+ 0 - 1
ambari-agent/src/main/python/ambari_agent/Register.py

@@ -61,7 +61,6 @@ class Register:
                   'hardwareProfile'   : self.hardware.get(),
                   'agentEnv'          : agentEnv
                 }
-    print str(time.time())
     return register
 
 def doExec(vals, key, command, preLF=False):

+ 2 - 1
ambari-agent/src/test/python/TestHeartbeat.py

@@ -43,7 +43,8 @@ class TestHeartbeat(TestCase):
     self.assertEquals(len(result['nodeStatus']), 2)
     self.assertEquals(result['nodeStatus']['cause'], "NONE")
     self.assertEquals(result['nodeStatus']['status'], "HEALTHY")
-    self.assertEquals(len(result), 7)
+    # result may or may NOT have an agentEnv structure in it
+    self.assertEquals((len(result) is 6) or (len(result) is 7), True)
     self.assertEquals(not heartbeat.reports, True, "Heartbeat should not contain task in progress")
 
 

+ 159 - 39
ambari-server/src/main/java/org/apache/ambari/server/agent/AgentEnv.java

@@ -17,61 +17,57 @@
  */
 package org.apache.ambari.server.agent;
 
-import java.util.Map;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
 
 /**
  * Agent environment data.
  */
 public class AgentEnv {
+
   /**
-   * Various directories in /etc
-   */
-  private Map<String, String> etcDirs = null;
-  /**
-   * Various directories in /var/run
+   * Various directories, configurable in <code>ambari-agent.ini</code>
    */
-  private Map<String, String> varRunDirs = null;
+  private Directory[] paths = new Directory[0];
+
   /**
-   * Various directories in /var/log
+   * Java processes running on the system.  Default empty array.
    */
-  private Map<String, String> varLogDirs = null;
-
+  private JavaProc[] javaProcs = new JavaProc[0];
+  
   /**
-   * Java processes with the word "hadoop" in them, users and pids
+   * Various RPM package versions.
    */
-  private JavaProc[] hadoopJavaProcs = null;
+  private Rpm[] rpms = new Rpm[0];
+  
   /**
-   * Number of pid files found in /var/run/hadoop
+   * Number of pid files found in <code>/var/run/hadoop</code>
    */
   private int varRunHadoopPidCount = 0;
+  
   /**
-   * Number of log files found in /var/log/hadoop
+   * Number of log files found in <code>/var/log/hadoop</code>
    */
   private int varLogHadoopLogCount = 0;
 
+  /**
+   * Directories that match name <code>/etc/alternatives/*conf</code>
+   */
+  private Alternative[] etcAlternativesConf = new Alternative[0];
+
+  /**
+   * Output for repo listing.  Command to do this varies, but for RHEL it is
+   * <code>yum -C repolist</code>
+   */
+  private String repoInfo;
   
-  public Map<String, String> getEtcDirs() {
-    return etcDirs;
-  }
-  
-  public void setEtcDirs(Map<String, String> dirs) {
-    etcDirs = dirs;
-  }
-  
-  public Map<String, String> getVarRunDirs() {
-    return varRunDirs;
-  }
-  
-  public void setVarRunDirs(Map<String, String> dirs) {
-    varRunDirs = dirs;
-  }
-  
-  public Map<String, String> getVarLogDirs() {
-    return varLogDirs;
+
+  public Directory[] getPaths() {
+      return paths;
   }
   
-  public void setVarLogDirs(Map<String, String> dirs) {
-    varLogDirs = dirs;
+  public void setPaths(Directory[] dirs) {
+    paths = dirs;
   }
   
   public void setVarRunHadoopPidCount(int count) {
@@ -90,17 +86,104 @@ public class AgentEnv {
     return varLogHadoopLogCount;
   }
   
-  public void setHadoopJavaProcs(JavaProc[] procs) {
-    hadoopJavaProcs = procs;
+  public void setJavaProcs(JavaProc[] procs) {
+    javaProcs = procs;
+  }
+  
+  public JavaProc[] getJavaProcs() {
+    return javaProcs;
+  }
+  
+  public void setRpms(Rpm[] rpm) {
+    rpms = rpm;
+  }
+  
+  public Rpm[] getRpms() {
+    return rpms;
+  }
+  
+  public void setEtcAlternativesConf(Alternative[] dirs) {
+    etcAlternativesConf = dirs;
+  }
+  
+  public Alternative[] getEtcAlternativesConf() {
+    return etcAlternativesConf;
+  }
+  
+  public void setRepoInfo(String info) {
+    repoInfo = info;
+  }
+  
+  public String getRepoInfo() {
+    return repoInfo;
+  }
+  
+  /**
+   * Represents information about rpm-installed packages
+   */
+  public static class Rpm {
+    private String rpmName;
+    private boolean rpmInstalled = false;
+    private String rpmVersion;
+    
+    public void setName(String name) {
+      rpmName = name;
+    }
+    
+    public String getName() {
+      return rpmName;
+    }
+    
+    public void setInstalled(boolean installed) {
+      rpmInstalled = installed;
+    }
+    
+    public boolean isInstalled() {
+      return rpmInstalled;
+    }
+    
+    public void setVersion(String version) {
+      rpmVersion = version;
+    }
+    
+    @JsonSerialize(include=Inclusion.NON_NULL)
+    public String getVersion() {
+      return rpmVersion;
+    }
   }
   
-  public JavaProc[] getHadoopJavaProcs() {
-    return hadoopJavaProcs;
+  /**
+   * Represents information about a directory of interest.
+   */
+  public static class Directory {
+    private String dirName;
+    private String dirType;
+    
+    public void setName(String name) {
+      dirName = name;
+    }
+    
+    public String getName() {
+      return dirName;
+    }
+    
+    public void setType(String type) {
+      dirType = type;
+    }
+    
+    public String getType() {
+      return dirType;
+    }
   }
   
+  /**
+   * Represents information about running java processes.
+   */
   public static class JavaProc {
     private String user;
-    private int pid;
+    private int pid = 0;
+    private boolean is_hadoop = false;
+    private String command;
     
     public void setUser(String user) {
       this.user = user;
@@ -117,6 +200,43 @@ public class AgentEnv {
     public int getPid() {
       return pid;
     }
+    
+    public void setHadoop(boolean hadoop) {
+      is_hadoop = hadoop;
+    }
+    
+    public boolean isHadoop() {
+      return is_hadoop;
+    }
+    
+    public void setCommand(String cmd) {
+      command = cmd;
+    }
+    
+    public String getCommand() {
+      return command;
+    }
+  }
+  
+  public static class Alternative {
+    private String altName;
+    private String altTarget;
+    
+    public void setName(String name) {
+      altName = name;
+    }
+    
+    public String getName() {
+      return altName;
+    }
+    
+    public void setTarget(String target) {
+      altTarget = target;
+    }
+    
+    public String getTarget() {
+      return altTarget;
+    }
   }
   
 }

+ 8 - 9
ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterTest.java

@@ -29,6 +29,7 @@ import junit.framework.Assert;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.agent.AgentEnv;
+import org.apache.ambari.server.agent.AgentEnv.Directory;
 import org.apache.ambari.server.agent.DiskInfo;
 import org.apache.ambari.server.agent.HostInfo;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
@@ -143,17 +144,15 @@ public class ClusterTest {
     hostInfo.setMounts(mounts);
 
     AgentEnv agentEnv = new AgentEnv();
-    Map<String, String> etcDirs = new HashMap<String, String>();
-    etcDirs.put("hadoop", "not_exist");
-    agentEnv.setEtcDirs(etcDirs);
     
-    Map<String, String> varRunDirs = new HashMap<String, String>();
-    varRunDirs.put("hadoop", "not_exist");
-    agentEnv.setVarRunDirs(varRunDirs);
+    Directory dir1 = new Directory();
+    dir1.setName("/etc/hadoop");
+    dir1.setType("not_exist");
+    Directory dir2 = new Directory();
+    dir2.setName("/var/log/hadoop");
+    dir2.setType("not_exist");
+    agentEnv.setPaths(new Directory[] { dir1, dir2 });
     
-    Map<String, String> varLogDirs = new HashMap<String, String>();
-    varLogDirs.put("hadoop", "not_exist");
-    agentEnv.setVarLogDirs(varLogDirs);
     
     AgentVersion agentVersion = new AgentVersion("0.0.x");
     long currentTime = 1001;