Преглед на файлове

HADOOP-2720. Jumbo bug fix patch to HOD. Final sync of Apache SVN with internal Yahoo SVN. Contributed by Hemanth Yamijala.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@615919 13f79535-47bb-0310-9956-ffa450edef68
Nigel Daley преди 17 години
родител
ревизия
5dae068144

+ 3 - 0
CHANGES.txt

@@ -591,6 +591,9 @@ Trunk (unreleased changes)
     HADOOP-2576. Namenode performance degradation over time triggered by
     large heartbeat interval. (Raghu Angadi)
 
+    HADOOP-2720. Jumbo bug fix patch to HOD.  Final sync of Apache SVN with
+    internal Yahoo SVN.  (Hemanth Yamijala via nigel)
+
 Release 0.15.3 - 2008-01-18
 
   BUG FIXES

+ 1 - 16
src/contrib/hod/bin/VERSION

@@ -1,16 +1 @@
-#Licensed to the Apache Software Foundation (ASF) under one
-#or more contributor license agreements.  See the NOTICE file
-#distributed with this work for additional information
-#regarding copyright ownership.  The ASF licenses this file
-#to you under the Apache License, Version 2.0 (the
-#"License"); you may not use this file except in compliance
-#with the License.  You may obtain a copy of the License at
-
-#     http://www.apache.org/licenses/LICENSE-2.0
-
-#Unless required by applicable law or agreed to in writing, software
-#distributed under the License is distributed on an "AS IS" BASIS,
-#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#See the License for the specific language governing permissions and
-#limitations under the License.
-DEVELOPMENT
+0.4.0

+ 155 - 126
src/contrib/hod/bin/hod

@@ -45,7 +45,9 @@ sys.path.append(libDirectory)
 from hodlib.Hod.hod import hodRunner
 from hodlib.Common.setup import *
 from hodlib.Common.descGenerator import *
-from hodlib.Common.util import local_fqdn, need_to_allocate, filter_warnings, get_exception_error_string
+from hodlib.Common.util import local_fqdn, need_to_allocate, filter_warnings,\
+    get_exception_error_string, hodInterrupt, \
+    HOD_INTERRUPTED_MESG, HOD_INTERRUPTED_CODE
 from hodlib.Common.tcp import tcpError, tcpSocket
 
 filter_warnings()
@@ -91,7 +93,8 @@ defList = { 'hod' : (
               False, True, False, True, 's'),
 
              ('min-nodes', 'pos_int', 
-              'Minimum number of nodes to allocate at startup.',
+              'Minimum number of nodes to allocate at startup. ' + \
+              'Used with hod.script option',
               True, None, False, True, 'm'),
 
              ('script', 'file', 'Hadoop script to execute.',
@@ -124,10 +127,25 @@ defList = { 'hod' : (
               False, None, True, True),
 
              ('client-params', 'keyval', 'Hadoop client xml key/value list',
-              False, None, False, True, 'C'), 
+              True, None, False, True, 'C'), 
 
              ('hadoop-ui-log-dir', 'directory', 'Directory to store Web UI Logs of Hadoop',
-              False, None, False, True)),
+              True, None, False, True),
+
+             ('temp-dir', 'directory', 'HOD temporary directories.',
+              False, None, True, False),
+
+             ('update-worker-info', 'bool', 'Specifies whether to update Worker Info after allocation',
+              False, False, False, True),
+
+             ('title', 'string', 'Title for the current HOD allocation.',
+               True, "HOD", False, True, 'N'),
+
+             ('walltime', 'pos_int', 'Walltime in seconds for the current HOD allocation',
+              True, None, False, True),
+
+             ('script-wait-time', 'pos_int', 'Specifies the time to wait before running the script. Used with the hod.script option.',
+              True, 10, False, True, 'W')),
 
             'resource_manager' : (
              ('id', 'string', 'Batch scheduler ID: torque|condor.',
@@ -137,7 +155,7 @@ defList = { 'hod' : (
               False, None, False, True),
               
              ('pbs-account', 'string', 'User Account jobs are submitted under.',
-              True, pwd.getpwuid(os.getuid())[0], False, False, 'A'),
+              True, None, False, False, 'A'),
               
              ('queue', 'string', 'Queue of the batch scheduler to query.',
               True, 'batch', False, True, 'Q'),
@@ -215,7 +233,7 @@ defList = { 'hod' : (
               False, None, False, False),
 
              ('server-params', 'keyval', 'Hadoop xml key/value list',
-              False, None, False, True, 'M'),
+              True, None, False, True, 'M'),
                
              ('envs', 'keyval', 'environment to run this package in',
               False, None, False, False),
@@ -344,140 +362,151 @@ def op_requires_pkgs(config):
     return config['hod'].has_key('script')
 
 if __name__ == '__main__':  
-  confDef = definition()
-  confDef.add_defs(defList, defOrder)
-  hodOptions = options(confDef, "./%s -c <CONFIG_FILE> [OPTIONS]" % myName,
-                       VERSION, withConfig=True, defaultConfig=DEFAULT_CONFIG)
-
-  # hodConfig is a dict like object, hodConfig[section][name]
   try:
-    hodConfig = config(hodOptions['config'], configDef=confDef, 
-                     originalDir=hodOptions['hod']['original-dir'],
-                     options=hodOptions) 
-  except IOError, e:
-    print >>sys.stderr,"error: %s not found. Specify the path to the HOD configuration file, or define the environment variable %s under which a file named hodrc can be found." % (hodOptions['config'], 'HOD_CONF_DIR')
-    sys.exit(1)
-
-  status = True
-  statusMsgs = []
-
-  (status,statusMsgs) = hodConfig.verify()
-  if not status:
-    print >>sys.stderr,"error: bin/hod failed to start."
-    for msg in statusMsgs:
-      print >>sys.stderr,"%s" % (msg)
-    sys.exit(1)
-
-  ## TODO : should move the dependency verification to hodConfig.verify
-  if hodConfig['hod'].has_key('script') \
-    and not hodConfig['hod'].has_key('min-nodes'):
-    printErrors(hodConfig.var_error('hod', 'min-nodes',
-        "hod.min-nodes must be specified when using hod.script option."))
-    sys.exit(1)
-
-  if hodConfig['hod'].has_key('min-nodes'):
-    if hodConfig['hod']['min-nodes'] < 3:
-      printErrors(hodConfig.var_error('hod', 'min-nodes',
-        "hod.min-nodes must be >= 3 nodes: %s." % 
-        hodConfig['hod']['min-nodes']))
+    confDef = definition()
+    confDef.add_defs(defList, defOrder)
+    hodOptions = options(confDef, "./%s -c <CONFIG_FILE> [OPTIONS]" % myName,
+                         VERSION, withConfig=True, defaultConfig=DEFAULT_CONFIG)
+  
+    # hodConfig is a dict like object, hodConfig[section][name]
+    try:
+      hodConfig = config(hodOptions['config'], configDef=confDef, 
+                       originalDir=hodOptions['hod']['original-dir'],
+                       options=hodOptions) 
+    except IOError, e:
+      print >>sys.stderr,"error: %s not found. Specify the path to the HOD configuration file, or define the environment variable %s under which a file named hodrc can be found." % (hodOptions['config'], 'HOD_CONF_DIR')
       sys.exit(1)
   
-  if hodConfig['hod'].has_key('operation') and \
-    hodConfig['hod'].has_key('script'):
-    print "Script execution and hod operations are mutually exclusive."
-    hodOptions.print_help(sys.stderr)
-    sys.exit(1)
+    status = True
+    statusMsgs = []
   
-  if 'operation' not in hodConfig['hod'] and 'script' not in hodConfig['hod']:
-    print "HOD requires at least a script or operation be specified."
-    hodOptions.print_help(sys.stderr)
-    sys.exit(1)    
+    (status,statusMsgs) = hodConfig.verify()
+    if not status:
+      print >>sys.stderr,"error: bin/hod failed to start."
+      for msg in statusMsgs:
+        print >>sys.stderr,"%s" % (msg)
+      sys.exit(1)
   
-  if hodConfig['gridservice-hdfs']['external']:
-    hdfsAddress = "%s:%s" % (hodConfig['gridservice-hdfs']['host'], 
-                             hodConfig['gridservice-hdfs']['fs_port'])
-
-    hdfsSocket = tcpSocket(hdfsAddress)
-      
-    try:
-      hdfsSocket.open()
-      hdfsSocket.close()
-    except tcpError:
-      printErrors(hodConfig.var_error('hod', 'gridservice-hdfs', 
-        "Failed to open a connection to external hdfs address: %s." % 
-        hdfsAddress))
+    ## TODO : should move the dependency verification to hodConfig.verify
+    if hodConfig['hod'].has_key('script') \
+      and not hodConfig['hod'].has_key('min-nodes'):
+      printErrors(hodConfig.var_error('hod', 'min-nodes',
+          "hod.min-nodes must be specified when using hod.script option."))
       sys.exit(1)
-  else:
-    hodConfig['gridservice-hdfs']['host'] = 'localhost'
-
-  if hodConfig['gridservice-mapred']['external']:
-    mapredAddress = "%s:%s" % (hodConfig['gridservice-mapred']['host'], 
-                               hodConfig['gridservice-mapred']['tracker_port'])
-
-    mapredSocket = tcpSocket(mapredAddress)
-      
-    try:
-      mapredSocket.open()
-      mapredSocket.close()
-    except tcpError:
-      printErrors(hodConfig.var_error('hod', 'gridservice-mapred', 
-        "Failed to open a connection to external mapred address: %s." % 
-        mapredAddress))
+  
+    if hodConfig['hod'].has_key('min-nodes'):
+      if hodConfig['hod']['min-nodes'] < 3:
+        printErrors(hodConfig.var_error('hod', 'min-nodes',
+          "hod.min-nodes must be >= 3 nodes: %s." % 
+          hodConfig['hod']['min-nodes']))
+        sys.exit(1)
+    
+    if hodConfig['hod'].has_key('operation') and \
+      hodConfig['hod'].has_key('script'):
+      print "Script execution and hod operations are mutually exclusive."
+      hodOptions.print_help(sys.stderr)
       sys.exit(1)
-  else:
-    hodConfig['gridservice-mapred']['host'] = 'localhost'
-
-  if not hodConfig['ringmaster'].has_key('hadoop-tar-ball') and \
-    not hodConfig['gridservice-hdfs'].has_key('pkgs') and \
-    op_requires_pkgs(hodConfig):
-    printErrors(hodConfig.var_error('gridservice-hdfs', 'pkgs', 
-      "gridservice-hdfs.pkgs must be defined if ringmaster.hadoop-tar-ball "
-      + "is not defined."))
-    sys.exit(1)
-
-  if not hodConfig['ringmaster'].has_key('hadoop-tar-ball') and \
-    not hodConfig['gridservice-mapred'].has_key('pkgs') and \
-    op_requires_pkgs(hodConfig):
-    printErrors(hodConfig.var_error('gridservice-mapred', 'pkgs', 
-      "gridservice-mapred.pkgs must be defined if ringmaster.hadoop-tar-ball "
-      + "is not defined."))
-    sys.exit(1)
-
-  if hodConfig['hodring'].has_key('log-destination-uri'):
-    if hodConfig['hodring']['log-destination-uri'].startswith('file://'):
-      pass
-    elif hodConfig['hodring']['log-destination-uri'].startswith('hdfs://'):
-      hostPort = hodConfig['hodring']['log-destination-uri'][7:].split("/")
-      hostPort = hostPort[0]
-      socket = tcpSocket(hostPort)
+    
+    if 'operation' not in hodConfig['hod'] and 'script' not in hodConfig['hod']:
+      print "HOD requires at least a script or operation be specified."
+      hodOptions.print_help(sys.stderr)
+      sys.exit(1)    
+    
+    if hodConfig['gridservice-hdfs']['external']:
+      hdfsAddress = "%s:%s" % (hodConfig['gridservice-hdfs']['host'], 
+                               hodConfig['gridservice-hdfs']['fs_port'])
+  
+      hdfsSocket = tcpSocket(hdfsAddress)
+        
       try:
-        socket.open()
-        socket.close()
-      except:
-        printErrors(hodConfig.var_error('hodring', 'log-destination-uri', 
-        "Unable to contact host/port specified in log destination uri: %s" % 
-        hodConfig['hodring']['log-destination-uri']))
+        hdfsSocket.open()
+        hdfsSocket.close()
+      except tcpError:
+        printErrors(hodConfig.var_error('hod', 'gridservice-hdfs', 
+          "Failed to open a connection to external hdfs address: %s." % 
+          hdfsAddress))
         sys.exit(1)
     else:
-      printErrors(hodConfig.var_error('hodring', 'log-destination-uri', 
-        "The log destiniation uri must be of type local:// or hdfs://."))
+      hodConfig['gridservice-hdfs']['host'] = 'localhost'
+  
+    if hodConfig['gridservice-mapred']['external']:
+      mapredAddress = "%s:%s" % (hodConfig['gridservice-mapred']['host'], 
+                                 hodConfig['gridservice-mapred']['tracker_port'])
+  
+      mapredSocket = tcpSocket(mapredAddress)
+        
+      try:
+        mapredSocket.open()
+        mapredSocket.close()
+      except tcpError:
+        printErrors(hodConfig.var_error('hod', 'gridservice-mapred', 
+          "Failed to open a connection to external mapred address: %s." % 
+          mapredAddress))
+        sys.exit(1)
+    else:
+      hodConfig['gridservice-mapred']['host'] = 'localhost'
+  
+    if not hodConfig['ringmaster'].has_key('hadoop-tar-ball') and \
+      not hodConfig['gridservice-hdfs'].has_key('pkgs') and \
+      op_requires_pkgs(hodConfig):
+      printErrors(hodConfig.var_error('gridservice-hdfs', 'pkgs', 
+        "gridservice-hdfs.pkgs must be defined if ringmaster.hadoop-tar-ball "
+        + "is not defined."))
+      sys.exit(1)
+  
+    if not hodConfig['ringmaster'].has_key('hadoop-tar-ball') and \
+      not hodConfig['gridservice-mapred'].has_key('pkgs') and \
+      op_requires_pkgs(hodConfig):
+      printErrors(hodConfig.var_error('gridservice-mapred', 'pkgs', 
+        "gridservice-mapred.pkgs must be defined if ringmaster.hadoop-tar-ball "
+        + "is not defined."))
       sys.exit(1)
-  ## TODO : end of should move the dependency verification to hodConfig.verif
-    
-  hodConfig['hod']['base-dir'] = rootDirectory
-  hodConfig['hod']['user_state'] = DEFAULT_HOD_DIR
-
-  dGen = DescGenerator(hodConfig)
-  hodConfig = dGen.initializeDesc()
   
-  os.environ['JAVA_HOME'] = hodConfig['hod']['java-home']
+    if hodConfig['hodring'].has_key('log-destination-uri'):
+      if hodConfig['hodring']['log-destination-uri'].startswith('file://'):
+        pass
+      elif hodConfig['hodring']['log-destination-uri'].startswith('hdfs://'):
+        hostPort = hodConfig['hodring']['log-destination-uri'][7:].split("/")
+        hostPort = hostPort[0]
+        socket = tcpSocket(hostPort)
+        try:
+          socket.open()
+          socket.close()
+        except:
+          printErrors(hodConfig.var_error('hodring', 'log-destination-uri', 
+          "Unable to contact host/port specified in log destination uri: %s" % 
+          hodConfig['hodring']['log-destination-uri']))
+          sys.exit(1)
+      else:
+        printErrors(hodConfig.var_error('hodring', 'log-destination-uri', 
+          "The log destiniation uri must be of type local:// or hdfs://."))
+        sys.exit(1)
+  
+    ## TODO : end of should move the dependency verification to hodConfig.verif
+      
+    hodConfig['hod']['base-dir'] = rootDirectory
+    hodConfig['hod']['user_state'] = DEFAULT_HOD_DIR
+  
+    dGen = DescGenerator(hodConfig)
+    hodConfig = dGen.initializeDesc()
+    
+    os.environ['JAVA_HOME'] = hodConfig['hod']['java-home']
+    
+    if hodConfig['hod']['debug'] == 4:
+      print ""
+      print "Using Python: %s" % sys.version
+      print ""
+   
+    hod = hodRunner(hodConfig)
   
-  if hodConfig['hod']['debug'] == 4:
-    print ""
-    print "Using Python: %s" % sys.version
-    print ""
+    # Initiate signal handling
+    hodInterrupt.set_log(hod.get_logger())
+    hodInterrupt.init_signals()
+    # Interrupts set up. Now on we handle signals only when we wish to.
+  except KeyboardInterrupt:
+    print HOD_INTERRUPTED_MESG
+    sys.exit(HOD_INTERRUPTED_CODE)
   
-  hod = hodRunner(hodConfig)
   if hodConfig['hod'].has_key('script'):
     sys.exit(hod.script())
   else:  

+ 97 - 1
src/contrib/hod/bin/hodring

@@ -188,6 +188,102 @@ if __name__ == '__main__':
     service = HodRing(hodRingOptions)
     service.start()
     service.wait()
+   
+    if service.log:
+      log = service.log
+    else: 
+      log = getLogger(hodRingOptions)
+
+    list = []
+    
+    runningHadoops = service.getRunningValues()
+      
+    for cmd in runningHadoops:
+      log.debug("addding %s to cleanup list..." % cmd)
+      cmd.addCleanup(list)
+    
+    list.append(service.getTempDir())
+    log.debug(list)
+       
+    # archive_logs now
+    cmdString = os.path.join(rootDirectory, "bin", "hodcleanup") # same python
+
+    if (len(runningHadoops) == 0):
+      log.info("len(runningHadoops) == 0, No running cluster?")
+      log.info("Skipping __copy_archive_to_dfs")
+      hadoopString = ""
+    else: hadoopString=runningHadoops[0].path
+
+    #construct the arguments
+    if hodRingOptions['hodring'].has_key('log-destination-uri'):
+      cmdString = cmdString + " --log-destination-uri " \
+                    + hodRingOptions['hodring']['log-destination-uri']
+
+    hadoopLogDirs = service.getHadoopLogDirs()
+    if hadoopLogDirs:
+      cmdString = cmdString \
+                    + " --hadoop-log-dirs " \
+                    + ",".join(hadoopLogDirs)
+
+    cmdString = cmdString \
+                  + " --temp-dir " \
+                  + service._cfg['temp-dir'] \
+                  + " --hadoop-command-string " \
+                  + hadoopString \
+                  + " --user-id " \
+                  + service._cfg['userid'] \
+                  + " --service-id " \
+                  + service._cfg['service-id'] \
+                  + " --hodring-debug " \
+                  + str(hodRingOptions['hodring']['debug']) \
+                  + " --hodring-log-dir " \
+                  + hodRingOptions['hodring']['log-dir'] \
+                  + " --hodring-cleanup-list " \
+                  + ",".join(list)
+
+    if hodRingOptions['hodring'].has_key('syslog-address'):
+      cmdString = cmdString + " --hodring-syslog-address " \
+                + hodRingOptions['hodring']['syslog-address']
+    if service._cfg.has_key('pkgs'):
+      cmdString = cmdString + " --pkgs " + service._cfg['pkgs']
+
+    log.info("cleanup commandstring : ")
+    log.info(cmdString)
+
+    # clean up
+    cmd = ['/bin/sh', '-c', cmdString]
+
+    mswindows = (sys.platform == "win32")
+    originalcwd = os.getcwd()
+
+    if not mswindows:
+      try: 
+        pid = os.fork() 
+        if pid > 0:
+          # exit first parent
+          log.info("child(pid: %s) is now doing cleanup" % pid)
+          sys.exit(0) 
+      except OSError, e: 
+        log.error("fork failed: %d (%s)" % (e.errno, e.strerror)) 
+        sys.exit(1)
+
+      # decouple from parent environment
+      os.chdir("/") 
+      os.setsid() 
+      os.umask(0) 
+ 
+    MAXFD = 128 # more than enough file descriptors to close. Just in case.
+    for i in xrange(0, MAXFD):
+      try:
+        os.close(i)
+      except OSError:
+        pass
+  
+    try:
+      os.execvp(cmd[0], cmd)
+    finally:
+      log.critical("exec failed")
+      os._exit(1)
 
   except Exception:
     if service:
@@ -195,4 +291,4 @@ if __name__ == '__main__':
         log = service.log
     else:
       log = getLogger(hodRingOptions)
-    log.error("bin/hodring failed to start. %s. \nStack trace:\n%s" %(get_exception_error_string(),get_exception_string()))
+    log.error("Error in bin/hodring %s. \nStack trace:\n%s" %(get_exception_error_string(),get_exception_string()))

+ 10 - 5
src/contrib/hod/bin/ringmaster

@@ -122,7 +122,7 @@ defList = { 'ringmaster' : (
               False, None, False, True),    
 
              ('pbs-account', 'string', 'User Account jobs are submitted under.',
-              False, None, True, False),
+              False, None, False, False),
 
              ('queue', 'string', 'Queue of the batch scheduler to query.',
               False, None, False, False),
@@ -317,14 +317,19 @@ if __name__ == '__main__':
   confDef.add_defs(defList, defOrder)
   ringMasterOptions = options(confDef, "./%s [OPTIONS]" % myName, VERSION)
   ensureLogDir(ringMasterOptions['ringmaster']['log-dir'])
-  log = getLogger(ringMasterOptions['ringmaster'])
+  log = None
 
   try:
+    log = getLogger(ringMasterOptions['ringmaster'])
     (status, statusMsgs) = ringMasterOptions.verify()
     if not status:
       raise Exception("%s" % statusMsgs)
+    ringMasterOptions.replace_escape_seqs()
     ringMasterOptions['ringmaster']['base-dir'] = rootDirectory 
-    main(ringMasterOptions,log)
-    sys.exit(0)
+    ret = main(ringMasterOptions,log)
+    sys.exit(ret)
   except Exception, e:
-    log.error("bin/ringmaster failed to start.%s. Stack trace follows:\n%s" % (get_exception_error_string(),get_exception_string()))
+    if log:
+      log.error("bin/ringmaster failed to start.%s. Stack trace follows:\n%s" % (get_exception_error_string(),get_exception_string()))
+    # Ringmaster failing to start is a ringmaster error. Exit with the appropriate exit code.
+    sys.exit(6)

+ 1 - 0
src/contrib/hod/conf/hodrc

@@ -6,6 +6,7 @@ cluster-factor                  = 1.8
 xrs-port-range                  = 32768-65536
 debug                           = 3
 allocate-wait-time              = 3600
+temp-dir                        = /tmp/hod
 
 [ringmaster]
 register                        = True

+ 1 - 1
src/contrib/hod/getting_started.txt

@@ -26,7 +26,7 @@ functionality from HOD:
   cluster. However, it can also use a pre-installed version of Hadoop,
   if it is available on all nodes in the cluster.
   (http://lucene.apache.org/hadoop)
-  HOD currently supports only Hadoop 0.16, which is under development.
+  HOD currently supports Hadoop 0.15 and above.
 
 NOTE: HOD configuration requires the location of installs of these 
 components to be the same on all nodes in the cluster. It will also 

+ 1 - 30
src/contrib/hod/hodlib/Common/desc.py

@@ -125,38 +125,9 @@ class ServiceDesc:
     self.dict.setdefault('pkgs', '')
     self.dict.setdefault('final-attrs', {})
     self._checkRequired()
-    self.__dict_update()
-
-  def __dict_update(self):
-    getattr(self, "_%s" % self.dict['id'])()
-
-  def _mapred(self):
-    if self.isExternal():
-      self.dict['final-attrs']['mapred.job.tracker'] = "%s:%s" % (self.dict['host'], 
-        self.dict['tracker_port'])
-      
-      # self.dict['final-attrs']['mapred.job.tracker.info.port'] = \
-      #   str(self.dict['info_port'])
-      # After Hadoop-2185
-      self.dict['final-attrs']['mapred.job.tracker.http.bindAddress'] = \
-        "%s:%s" %(self.dict['host'], self.dict['info_port'])
-      
     if self.dict.has_key('hadoop-tar-ball'):
       self.dict['tar'] = self.dict['hadoop-tar-ball']  
-  
-  def _hdfs(self):
-    if self.isExternal():
-      self.dict['final-attrs']['fs.default.name'] = "%s:%s" % (self.dict['host'], 
-        self.dict['fs_port'])
-      
-      # self.dict['final-attrs']['dfs.info.port'] = str(self.dict['info_port'])
-      # After Hadoop-2185
-      self.dict['final-attrs']['dfs.http.bindAddress'] = "%s:%s" % \
-        (self.dict['host'], self.dict['info_port'])
-      
-    if self.dict.has_key('hadoop-tar-ball'):
-      self.dict['tar'] = self.dict['hadoop-tar-ball']
-  
+
   def _checkRequired(self):
 
     if not 'id' in self.dict:

+ 1 - 2
src/contrib/hod/hodlib/Common/hodsvc.py

@@ -15,7 +15,6 @@
 #limitations under the License.
 # $Id:setup.py 5158 2007-04-09 00:14:35Z zim $
 #
-# Christopher Zimmerman - zim@yahoo-inc.com - 04/13/2007
 #------------------------------------------------------------------------------
 import os, time, shutil, xmlrpclib, socket, pprint
 
@@ -51,7 +50,7 @@ class hodBaseService:
     
     self._init_logging()
         
-    self._init_signals()
+    if name != 'serviceRegistry': self._init_signals()
     self._init_xrc_server()
     
   def __set_logging_level(self, level):

+ 26 - 5
src/contrib/hod/hodlib/Common/setup.py

@@ -16,7 +16,6 @@
 # $Id:setup.py 5158 2007-04-09 00:14:35Z zim $
 # $Id:setup.py 5158 2007-04-09 00:14:35Z zim $
 #
-# Christopher Zimmerman - zim@yahoo-inc.com - 04/07/2007
 #------------------------------------------------------------------------------
 
 """'setup' provides for reading and verifing configuration files based on
@@ -26,7 +25,7 @@ import sys, os, re, pprint
 
 from ConfigParser import SafeConfigParser
 from optparse import OptionParser, IndentedHelpFormatter, OptionGroup
-from util import get_perms
+from util import get_perms, replace_escapes
 from types import typeValidator, is_valid_type, typeToString
 
 reEmailAddress = re.compile("^.*@.*$")
@@ -37,6 +36,8 @@ reCommentHack = re.compile("^.*?\s+#|;.*", flags=re.S)
 reCommentNewline = re.compile("\W$")
 reKeyVal = r"(?<!\\)="
 reKeyVal = re.compile(reKeyVal)
+reKeyValList = r"(?<!\\),"
+reKeyValList = re.compile(reKeyValList)
 
 errorPrefix = 'error'
 requiredPerms = '0660'
@@ -485,7 +486,7 @@ class config(SafeConfigParser, baseConfig):
                            # Append to the current list of values in self._dict
                            if not self._dict[section].has_key(option):
                              self._dict[section][option] = ""
-                           dictOpts = self._dict[section][option].split(",")
+                           dictOpts = reKeyValList.split(self._dict[section][option])
                            dictOptsKeyVals = {}
                            for opt in dictOpts:
                               if opt != '':
@@ -495,13 +496,16 @@ class config(SafeConfigParser, baseConfig):
                                   # we only consider the first '=' for splitting
                                   # we do this to support passing params like
                                   # mapred.child.java.opts=-Djava.library.path=some_dir
+                                  # Even in case of an invalid error like unescaped '=',
+                                  # we don't want to fail here itself. We leave such errors 
+                                  # to be caught during validation which happens after this
                                   dictOptsKeyVals[key] = val
                                 else: 
                                   # this means an invalid option. Leaving it
                                   #for config.verify to catch
                                   dictOptsKeyVals[opt] = None
                                 
-                           cmdLineOpts = self._options[section][option].split(",")
+                           cmdLineOpts = reKeyValList.split(self._options[section][option])
 
                            for opt in cmdLineOpts:
                               if reKeyVal.search(opt):
@@ -573,6 +577,10 @@ class config(SafeConfigParser, baseConfig):
             raise Exception( error)
             sys.exit(1)
 
+    def replace_escape_seqs(self):
+      """ replace any escaped characters """
+      replace_escapes(self)
+
 class formatter(IndentedHelpFormatter):
     def format_option_strings(self, option):
         """Return a comma-separated list of option strings & metavariables."""
@@ -667,11 +675,21 @@ class options(OptionParser, baseConfig):
             self.config = self.__parsedOptions.config
             if not self.config:
                 self.error("configuration file must be specified")
+            if not os.path.isabs(self.config):
+                # A relative path. Append the original directory which would be the
+                # current directory at the time of launch
+                try:  
+                    origDir = getattr(self.__parsedOptions, 'hod.original-dir')
+                    if origDir is not None:
+                        self.config = os.path.join(origDir, self.config)
+                        self.__parsedOptions.config = self.config
+                except AttributeError, e:
+                    self.error("hod.original-dir is not defined.\
+                                   Cannot get current directory")
             if not os.path.exists(self.config):
                 if self.__defaultLoc and not re.search("/", self.config):
                     self.__parsedOptions.config = os.path.join(
                         self.__defaultLoc, self.config)
-    
         self.__build_dict()   
 
     
@@ -910,3 +928,6 @@ class options(OptionParser, baseConfig):
                         
     def verify(self):
         return baseConfig.verify(self)
+
+    def replace_escape_seqs(self):
+      replace_escapes(self)

+ 0 - 1
src/contrib/hod/hodlib/Common/socketServers.py

@@ -15,7 +15,6 @@
 #limitations under the License.
 # Various socket server and helper classes.
 #
-# Christopher Zimmerman - zim@yahoo-inc.com - 03/07/2007
 #
 import os, sys, socket, threading, pprint, re, xmlrpclib, time
   

+ 0 - 1
src/contrib/hod/hodlib/Common/tcp.py

@@ -15,7 +15,6 @@
 #limitations under the License.
 # $Id:tcp.py 6172 2007-05-22 20:26:54Z zim $
 #
-# Christopher Zimmerman - zim@yahoo-inc.com - 04/07/2007
 #------------------------------------------------------------------------------
 
 """ TCP related classes. """

+ 5 - 2
src/contrib/hod/hodlib/Common/threads.py

@@ -132,13 +132,16 @@ class simpleCommand(baseThread):
                 output = cmd.fromchild.readline()
 
         elif self.__wait == False:
-            for output in cmd.fromchild.readlines():
+            output = cmd.fromchild.readline()
+            while output != '':
                 while not self.running.isSet():
                     if self.stopFlag.isSet():
                         break
                     time.sleep(1)
-                
                 print output,
+                if self.stopFlag.isSet():
+                    break
+                output = cmd.fromchild.readline()
         else:
             self.stdout = cmd.fromchild
 

+ 15 - 13
src/contrib/hod/hodlib/Common/types.py

@@ -15,7 +15,6 @@
 #limitations under the License.
 # $Id:types.py 6172 2007-05-22 20:26:54Z zim $
 #
-# Christopher Zimmerman - zim@yahoo-inc.com - 04/07/2007
 #------------------------------------------------------------------------------
 
 """ Higher level data types and type related classes.
@@ -325,12 +324,17 @@ class typeToString:
         return value
 
     def __tostring_keyval(self, value):
-        string = ''
+        string = '"' # to protect from shell escapes
         for key in value:
-            for item in value[key]:
-                string = "%s%s=%s," % (string, key, item)
-                
-        return string[:-1]  
+          # for item in value[key]:
+          #      string = "%s%s=%s," % (string, key, item)
+          # Quotes still cannot protect Double-slashes.
+          # Dealing with them separately
+          val = re.sub(r"\\\\",r"\\\\\\\\",value[key])
+
+          string = "%s%s=%s," % (string, key, val)
+
+        return string[:-1] + '"'
 
     def __tostring_list(self, value):
         string = ''
@@ -678,13 +682,11 @@ class typeValidator:
         list = self.__norm_list(value)
         keyValue = {}
         for item in list:
-            # we only consider the first '=' for splitting
-            # we do this to support passing params like 
-            # mapred.child.java.opts=-Djava.library.path=some_dir
-            (key, value) = reKeyVal.split(item,1)
-            if not keyValue.has_key(key):
-                keyValue[key] = []
-            keyValue[key].append(value)
+            (key, value) = reKeyVal.split(item)
+            #if not keyValue.has_key(key):
+            #    keyValue[key] = []
+            #keyValue[key].append(value)
+            keyValue[key] = value
         return keyValue     
 
     def __verify_list(self, type, value):

+ 120 - 1
src/contrib/hod/hodlib/Common/util.py

@@ -13,12 +13,17 @@
 #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #See the License for the specific language governing permissions and
 #limitations under the License.
-import sys, os, traceback, stat, socket, re, warnings
+import sys, os, traceback, stat, socket, re, warnings, signal
 
 from hodlib.Common.tcp import tcpSocket, tcpError 
 from hodlib.Common.threads import simpleCommand
 
 setUGV   = { 'S_ISUID' : 2, 'S_ISGID' : 1, 'S_ISVTX' : 0 }
+reEscapeSeq = r"\\(.)?"
+reEscapeSeq = re.compile(reEscapeSeq)
+
+HOD_INTERRUPTED_CODE = 127
+HOD_INTERRUPTED_MESG = "Hod Interrupted. Cleaning up and exitting"
 
 class AlarmException(Exception):
     def __init__(self, msg=''):
@@ -170,3 +175,117 @@ def args_to_string(list):
   for item in list:
     arg = "%s%s " % (arg, item)
   return arg[:-1]
+
+def replace_escapes(object):
+  """ replace any escaped character. e.g \, with , \= with = and so on """
+  # here object is either a config object or a options object
+  for section in object._mySections:
+    for option in object._configDef[section].keys():
+      if object[section].has_key(option):
+        if object._configDef[section][option]['type'] == 'keyval':
+          keyValDict = object[section][option]
+          object[section][option] = {}
+          for (key,value) in keyValDict.iteritems():
+            match = reEscapeSeq.search(value)
+            if match:
+              value = reEscapeSeq.sub(r"\1", value)
+            object[section][option][key] = value
+
+def hadoopVersion(hadoopDir, java_home, log):
+  # Determine the version of hadoop being used by executing the 
+  # hadoop version command. Code earlier in idleTracker.py
+  hadoopVersion = { 'major' : None, 'minor' : None }
+  hadoopPath = os.path.join(hadoopDir, 'bin', 'hadoop')
+  cmd = "%s version" % hadoopPath
+  log.debug('Executing command %s to find hadoop version' % cmd)
+  env = os.environ
+  env['JAVA_HOME'] = java_home
+  hadoopVerCmd = simpleCommand('HadoopVersion', cmd, env)
+  hadoopVerCmd.start()
+  hadoopVerCmd.wait()
+  hadoopVerCmd.join()
+  if hadoopVerCmd.exit_code() == 0:
+    verLine = hadoopVerCmd.output()[0]
+    log.debug('Version from hadoop command: %s' % verLine)
+    hadoopVerRegExp = re.compile("Hadoop ([0-9]+)\.([0-9]+).*")
+    verMatch = hadoopVerRegExp.match(verLine)
+    if verMatch != None:
+      hadoopVersion['major'] = verMatch.group(1)
+      hadoopVersion['minor'] = verMatch.group(2)
+  return hadoopVersion
+
+
+def get_cluster_status(hdfsAddress, mapredAddress):
+  """Determine the status of the cluster based on socket availability
+     of HDFS and Map/Reduce."""
+  status = 0
+
+  mapredSocket = tcpSocket(mapredAddress)
+  try:
+    mapredSocket.open()
+    mapredSocket.close()
+  except tcpError:
+    status = 14
+
+  hdfsSocket = tcpSocket(hdfsAddress)
+  try:
+    hdfsSocket.open()
+    hdfsSocket.close()
+  except tcpError:
+    if status > 0:
+      status = 10
+    else:
+      status = 13
+
+  return status
+
+def parseEquals(list):
+  # takes in a list of keyval pairs e.g ['a=b','c=d'] and returns a
+  # dict e.g {'a'='b','c'='d'}. Used in GridService/{mapred.py/hdfs.py} and 
+  # HodRing/hodring.py. No need for specially treating escaped =. as in \=,
+  # since all keys are generated by hod and don't contain such anomalies
+  dict = {}
+  for elems in list:
+    splits = elems.split('=')
+    dict[splits[0]] = splits[1]
+  return dict
+
+class HodInterrupt:
+  def __init__(self):
+    self.HodInterruptFlag = False
+    self.log = None
+
+  def set_log(self, log):
+    self.log = log
+
+  def init_signals(self):
+
+    def sigStop(sigNum, handler):
+      sig_wrapper(sigNum, self.setFlag)
+
+    signal.signal(signal.SIGTERM, sigStop) # 15 : software termination signal
+    signal.signal(signal.SIGQUIT, sigStop) # 3  : Quit program
+    signal.signal(signal.SIGINT, sigStop)  # 2 ^C : Interrupt program
+
+    def sig_wrapper(sigNum, handler, *args):
+      self.log.critical("Caught signal %s." % sigNum )
+
+      if args:
+          handler(args)
+      else:
+          handler()
+
+  def setFlag(self, val = True):
+    self.HodInterruptFlag = val
+
+  def isSet(self):
+    return self.HodInterruptFlag
+
+class HodInterruptException(Exception):
+  def __init__(self, value = ""):
+    self.value = value
+    
+  def __str__(self):
+    return repr(self.value)
+
+hodInterrupt = HodInterrupt()

+ 3 - 0
src/contrib/hod/hodlib/Common/xmlrpc.py

@@ -14,6 +14,7 @@
 #See the License for the specific language governing permissions and
 #limitations under the License.
 import xmlrpclib, time, random, signal
+from hodlib.Common.util import hodInterrupt
 
 class hodXRClient(xmlrpclib.ServerProxy):
     def __init__(self, uri, transport=None, encoding=None, verbose=0,
@@ -42,6 +43,8 @@ class hodXRClient(xmlrpclib.ServerProxy):
                 break
             except Exception:
                 if self.__retryRequests:
+                  if hodInterrupt.isSet():
+                    raise HodInterruptException()
                   time.sleep(retryWaitTime)
                 else:
                   raise Exception("hodXRClientTimeout")

+ 65 - 36
src/contrib/hod/hodlib/GridServices/hdfs.py

@@ -22,15 +22,16 @@ import os
 from service import *
 from hodlib.Hod.nodePool import *
 from hodlib.Common.desc import CommandDesc
-from hodlib.Common.util import get_exception_string
+from hodlib.Common.util import get_exception_string, parseEquals
 
 class HdfsExternal(MasterSlave):
   """dummy proxy to external HDFS instance"""
 
-  def __init__(self, serviceDesc, workDirs):
+  def __init__(self, serviceDesc, workDirs, version):
     MasterSlave.__init__(self, serviceDesc, workDirs,None)
     self.launchedMaster = True
     self.masterInitialized = True
+    self.version = version
     
   def getMasterRequest(self):
     return None
@@ -49,21 +50,33 @@ class HdfsExternal(MasterSlave):
     addr = attrs['fs.default.name']
     return [addr]
   
-  def setMasterParams(self, list):
-    raise NotImplementedError
+  def setMasterParams(self, dict):
+   self.serviceDesc.dict['final-attrs']['fs.default.name'] = "%s:%s" % \
+     (dict['host'], dict['fs_port'])
+
+   if self.version < 16:
+    self.serviceDesc.dict['final-attrs']['dfs.info.port'] = \
+                                    str(self.serviceDesc.dict['info_port'])
+   else:
+     # After Hadoop-2185
+     self.serviceDesc.dict['final-attrs']['dfs.http.bindAddress'] = "%s:%s" % \
+       (dict['host'], dict['info_port'])
 
   def getInfoAddrs(self):
     attrs = self.serviceDesc.getfinalAttrs()
-    addr = attrs['fs.default.name']
-    k,v = addr.split( ":")
-    # infoaddr = k + ':' + attrs['dfs.info.port']
-    # After Hadoop-2185
-    infoaddr = attrs['dfs.http.bindAddress']
+    if self.version < 16:
+      addr = attrs['fs.default.name']
+      k,v = addr.split( ":")
+      infoaddr = k + ':' + attrs['dfs.info.port']
+    else:
+      # After Hadoop-2185
+      infoaddr = attrs['dfs.http.bindAddress']
     return [infoaddr]
 
 class Hdfs(MasterSlave):
 
-  def __init__(self, serviceDesc, nodePool, required_node, format=True, upgrade=False):
+  def __init__(self, serviceDesc, nodePool, required_node, version, \
+                                        format=True, upgrade=False):
     MasterSlave.__init__(self, serviceDesc, nodePool, required_node)
     self.masterNode = None
     self.masterAddr = None
@@ -73,6 +86,7 @@ class Hdfs(MasterSlave):
     self.format = format
     self.upgrade = upgrade
     self.workers = []
+    self.version = version
 
   def getMasterRequest(self):
     req = NodeRequest(1, [], False)
@@ -124,16 +138,14 @@ class Hdfs(MasterSlave):
     self.masterAddr = dict['fs.default.name']
     k,v = self.masterAddr.split( ":")
     self.masterNode = k
-    # self.infoAddr = self.masterNode + ':' + dict['dfs.info.port']
-    # After Hadoop-2185
-    self.infoAddr = dict['dfs.http.bindAddress']
+    if self.version < 16:
+      self.infoAddr = self.masterNode + ':' + dict['dfs.info.port']
+    else:
+      # After Hadoop-2185
+      self.infoAddr = dict['dfs.http.bindAddress']
    
   def _parseEquals(self, list):
-    dict = {}
-    for elems in list:
-      splits = elems.split('=')
-      dict[splits[0]] = splits[1]
-    return dict
+    return parseEquals(list)
   
   def _getNameNodePort(self):
     sd = self.serviceDesc
@@ -152,16 +164,25 @@ class Hdfs(MasterSlave):
   def _getNameNodeInfoPort(self):
     sd = self.serviceDesc
     attrs = sd.getfinalAttrs()
-    if 'dfs.http.bindAddress' not in attrs:
-      return ServiceUtil.getUniqPort()
+    if self.version < 16:
+      if 'dfs.info.bindAddress' not in attrs:
+        return ServiceUtil.getUniqPort()
+    else:
+      if 'dfs.http.bindAddress' not in attrs:
+        return ServiceUtil.getUniqPort()
 
-    # p = attrs['dfs.info.port'] 
-    p = attrs['dfs.http.bindAddress'].split(':')[1]
+    if self.version < 16:
+      p = attrs['dfs.info.port']
+    else:
+      p = attrs['dfs.http.bindAddress'].split(':')[1]
     try:
       return int(p)
     except:
       print get_exception_string()
-      raise ValueError, "Can't find port from attr dfs.info.port: %s" % (p)
+      if self.version < 16:
+        raise ValueError, "Can't find port from attr dfs.info.port: %s" % (p)
+      else:
+        raise ValueError, "Can't find port from attr dfs.http.bindAddress: %s" % (p)
 
   def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):
     namedir = None
@@ -183,7 +204,7 @@ class Hdfs(MasterSlave):
     attrs['dfs.name.dir'] = namedir
     attrs['dfs.data.dir'] = ','.join(datadir)
     # FIXME -- change dfs.client.buffer.dir
-    envs['HADOOP_ROOT_LOGGER'] = ["INFO,DRFA",]
+    envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA"
 
 
   def _getNameNodeCommand(self, format=False, upgrade=False):
@@ -199,13 +220,14 @@ class Hdfs(MasterSlave):
       attrs['fs.default.name'] = 'fillinhostport'
     #self.infoPort = port = self._getNameNodeInfoPort()
  
-    # if 'dfs.info.port' not in attrs:
-    #  attrs['dfs.info.port'] = 'fillinport'
-   
-    # Addressing Hadoop-2815, added the following. Earlier version don't
-    # care about this
-    if 'dfs.http.bindAddress' not in attrs:
-      attrs['dfs.http.bindAddress'] = 'fillinhostport'
+    if self.version < 16:
+     if 'dfs.info.port' not in attrs:
+      attrs['dfs.info.port'] = 'fillinport'
+    else:
+      # Addressing Hadoop-2815, added the following. Earlier versions don't
+      # care about this
+      if 'dfs.http.bindAddress' not in attrs:
+        attrs['dfs.http.bindAddress'] = 'fillinhostport'
 
     self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn')
 
@@ -277,11 +299,18 @@ class Hdfs(MasterSlave):
 
     attrs['fs.default.name'] = nn
 
-    # Adding the following. Hadoop-2815
-    if 'dfs.datanode.bindAddress' not in attrs:
-      attrs['dfs.datanode.bindAddress'] = 'fillinhostport'
-    if 'dfs.datanode.http.bindAddress' not in attrs:
-      attrs['dfs.datanode.http.bindAddress'] = 'fillinhostport'
+    if self.version < 16:
+      if 'dfs.datanode.port' not in attrs:
+        attrs['dfs.datanode.port'] = 'fillinport'
+      if 'dfs.datanode.info.port' not in attrs:
+        attrs['dfs.datanode.info.port'] = 'fillinport'
+    else:
+      # Adding the following. Hadoop-2815
+      if 'dfs.datanode.bindAddress' not in attrs:
+        attrs['dfs.datanode.bindAddress'] = 'fillinhostport'
+      if 'dfs.datanode.http.bindAddress' not in attrs:
+        attrs['dfs.datanode.http.bindAddress'] = 'fillinhostport'
+    
     self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-dn')
 
     dict = { 'name' : 'datanode' }

+ 71 - 43
src/contrib/hod/hodlib/GridServices/mapred.py

@@ -22,15 +22,16 @@ import os, copy, time
 from service import *
 from hodlib.Hod.nodePool import *
 from hodlib.Common.desc import CommandDesc
-from hodlib.Common.util import get_exception_string
+from hodlib.Common.util import get_exception_string, parseEquals
 
 class MapReduceExternal(MasterSlave):
   """dummy proxy to external MapReduce instance"""
 
-  def __init__(self, serviceDesc, workDirs):
+  def __init__(self, serviceDesc, workDirs, version):
     MasterSlave.__init__(self, serviceDesc, workDirs,None)
     self.launchedMaster = True
     self.masterInitialized = True
+    self.version = version
     
   def getMasterRequest(self):
     return None
@@ -55,22 +56,33 @@ class MapReduceExternal(MasterSlave):
   def needsLess(self):
     return 0
 
-  def setMasterParams(self, list):
-    raise NotImplementedError
-  
+  def setMasterParams(self, dict):
+    self.serviceDesc['final-attrs']['mapred.job.tracker'] = "%s:%s" % (dict['host'], 
+      dict['tracker_port'])
+    
+    if self.version < 16:
+      self.serviceDesc.dict['final-attrs']['mapred.job.tracker.info.port'] = \
+                                      str(self.serviceDesc.dict['info_port'])
+    else:
+      # After Hadoop-2185
+      self.serviceDesc['final-attrs']['mapred.job.tracker.http.bindAddress'] = \
+        "%s:%s" %(dict['host'], dict['info_port'])
+
   def getInfoAddrs(self):
     attrs = self.serviceDesc.getfinalAttrs()
-    addr = attrs['mapred.job.tracker']
-    k,v = addr.split( ":")
-    # infoaddr = k + ':' + attrs['mapred.job.tracker.info.port']
-    # After Hadoop-2185
-    # Note: earlier,we never respected mapred.job.tracker.http.bindAddress
-    infoaddr = attrs['mapred.job.tracker.http.bindAddress']
+    if self.version < 16:
+      addr = attrs['mapred.job.tracker']
+      k,v = addr.split( ":")
+      infoaddr = k + ':' + attrs['mapred.job.tracker.info.port']
+    else:
+      # After Hadoop-2185
+      # Note: earlier,we never respected mapred.job.tracker.http.bindAddress
+      infoaddr = attrs['mapred.job.tracker.http.bindAddress']
     return [infoaddr]
   
 class MapReduce(MasterSlave):
 
-  def __init__(self, serviceDesc, workDirs,required_node):
+  def __init__(self, serviceDesc, workDirs,required_node, version):
     MasterSlave.__init__(self, serviceDesc, workDirs,required_node)
 
     self.masterNode = None
@@ -78,6 +90,7 @@ class MapReduce(MasterSlave):
     self.infoAddr = None
     self.workers = []
     self.required_node = required_node
+    self.version = version
 
   def isLaunchable(self, serviceDict):
     hdfs = serviceDict['hdfs']
@@ -127,16 +140,14 @@ class MapReduce(MasterSlave):
     self.masterAddr = dict['mapred.job.tracker']
     k,v = self.masterAddr.split(":")
     self.masterNode = k
-    # self.infoAddr = self.masterNode + ':' + dict['mapred.job.tracker.info.port']
-    # After Hadoop-2185
-    self.infoAddr = dict['mapred.job.tracker.http.bindAddress']
+    if self.version < 16:
+      self.infoAddr = self.masterNode + ':' + dict['mapred.job.tracker.info.port']
+    else:
+      # After Hadoop-2185
+      self.infoAddr = dict['mapred.job.tracker.http.bindAddress']
   
   def _parseEquals(self, list):
-    dict = {}
-    for elems in list:
-      splits = elems.split('=')
-      dict[splits[0]] = splits[1]
-    return dict
+    return parseEquals(list)
 
   def _getJobTrackerPort(self):
     sd = self.serviceDesc
@@ -152,21 +163,29 @@ class MapReduce(MasterSlave):
       print get_exception_string()
       raise ValueError, "Can't find port from attr mapred.job.tracker: %s" % (v)
 
+  # UNUSED METHOD
   def _getJobTrackerInfoPort(self):
     sd = self.serviceDesc
     attrs = sd.getfinalAttrs()
-    # if not 'mapred.job.tracker.info.port' in attrs:
-    if 'mapred.job.tracker.http.bindAddress' not in attrs:
-      return ServiceUtil.getUniqPort()
-
-    # p = attrs['mapred.job.tracker.info.port']
-    p = attrs['mapred.job.tracker.http.bindAddress']
+    if self.version < 16:
+      if not 'mapred.job.tracker.info.port' in attrs:
+        return ServiceUtil.getUniqPort()
+    else:
+      if 'mapred.job.tracker.http.bindAddress' not in attrs:
+        return ServiceUtil.getUniqPort()
+
+    if self.version < 16:
+      p = attrs['mapred.job.tracker.info.port']
+    else:
+      p = attrs['mapred.job.tracker.http.bindAddress'].split(':')[1]
     try:
       return int(p)
     except:
       print get_exception_string()
-      # raise ValueError, "Can't find port from attr mapred.job.tracker.info.port: %s" % (p)
-      raise ValueError, "Can't find port from attr mapred.job.tracker.http.bindAddress: %s" % (p)
+      if self.version < 16:
+        raise ValueError, "Can't find port from attr mapred.job.tracker.info.port: %s" % (p)
+      else:
+        raise ValueError, "Can't find port from attr mapred.job.tracker.http.bindAddress: %s" % (p)
 
   def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):
     local = []
@@ -193,7 +212,7 @@ class MapReduce(MasterSlave):
     attrs['dfs.client.buffer.dir'] = ','.join(dfsclient)
 
 
-    envs['HADOOP_ROOT_LOGGER'] = ["INFO,DRFA",]
+    envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA"
 
 
   def _getJobTrackerCommand(self, hdfs):
@@ -201,25 +220,28 @@ class MapReduce(MasterSlave):
 
     parentDirs = self.workDirs
     workDirs = []
-    attrs = sd.getfinalAttrs()
-    envs = sd.getEnvs()
+    attrs = sd.getfinalAttrs().copy()
+    envs = sd.getEnvs().copy()
 
     #self.masterPort = port = self._getJobTrackerPort()
     if 'mapred.job.tracker' not in attrs:
       attrs['mapred.job.tracker'] = 'fillinhostport'
 
     #self.infoPort = port = self._getJobTrackerInfoPort()
-    # if 'mapred.job.tracker.info.port' not in attrs:
-    #   attrs['mapred.job.tracker.info.port'] = 'fillinport'
+    if self.version < 16:
+      if 'mapred.job.tracker.info.port' not in attrs:
+        attrs['mapred.job.tracker.info.port'] = 'fillinport'
+    else:
+      # Addressing Hadoop-2815,
+      if 'mapred.job.tracker.http.bindAddress' not in attrs:
+        attrs['mapred.job.tracker.http.bindAddress'] = 'fillinhostport'
 
     attrs['fs.default.name'] = hdfs.getMasterAddrs()[0]
-    # Addressing Hadoop-2815,
-    if 'mapred.job.tracker.http.bindAddress' not in attrs:
-      attrs['mapred.job.tracker.http.bindAddress'] = 'fillinhostport'
 
     self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'mapred-jt')
 
     dict = { 'name' : 'jobtracker' }
+    dict['version'] = self.version
     dict['program'] = os.path.join('bin', 'hadoop')
     dict['argv'] = ['jobtracker']
     dict['envs'] = envs
@@ -236,8 +258,8 @@ class MapReduce(MasterSlave):
 
     parentDirs = self.workDirs
     workDirs = []
-    attrs = sd.getfinalAttrs()
-    envs = sd.getEnvs()
+    attrs = sd.getfinalAttrs().copy()
+    envs = sd.getEnvs().copy()
     jt = self.masterAddr
 
     if jt == None:
@@ -246,11 +268,17 @@ class MapReduce(MasterSlave):
     attrs['mapred.job.tracker'] = jt
     attrs['fs.default.name'] = hdfs.getMasterAddrs()[0]
 
-    # Adding the following. Hadoop-2815
-    if 'mapred.task.tracker.report.bindAddress' not in attrs:
-      attrs['mapred.task.tracker.report.bindAddress'] = 'fillinhostport'
-    if 'mapred.task.tracker.http.bindAddress' not in attrs:
-      attrs['mapred.task.tracker.http.bindAddress'] = 'fillinhostport'
+    if self.version < 16:
+      if 'tasktracker.http.port' not in attrs:
+        attrs['tasktracker.http.port'] = 'fillinport'
+      # earlier to 16, tasktrackers always took ephemeral port 0 for
+      # tasktracker.report.bindAddress
+    else:
+      # Adding the following. Hadoop-2815
+      if 'mapred.task.tracker.report.bindAddress' not in attrs:
+        attrs['mapred.task.tracker.report.bindAddress'] = 'fillinhostport'
+      if 'mapred.task.tracker.http.bindAddress' not in attrs:
+        attrs['mapred.task.tracker.http.bindAddress'] = 'fillinhostport'
 
     self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'mapred-tt')
 

+ 186 - 124
src/contrib/hod/hodlib/Hod/hadoop.py

@@ -57,8 +57,8 @@ class hadoopConfig:
     
     return prop
 
-  def gen_site_conf(self, confDir, numNodes, hdfsAddr, mapredAddr=None,\
-             clientParams=None, serverParams=None,\
+  def gen_site_conf(self, confDir, tempDir, numNodes, hdfsAddr,\
+             mapredAddr=None, clientParams=None, serverParams=None,\
              finalServerParams=None, clusterFactor=None):
     if not mapredAddr:
       mapredAddr = "dummy:8181"
@@ -69,51 +69,58 @@ class hadoopConfig:
       "This is an auto generated hadoop-site.xml, do not modify")
     topElement = doc.documentElement
     topElement.appendChild(comment)
-    prop = self.__create_xml_element(doc, 'mapred.job.tracker', 
-                                     mapredAddr, "description")
-    topElement.appendChild(prop)
-    prop = self.__create_xml_element(doc, 'fs.default.name', hdfsAddr, 
-                                   "description")
-    topElement.appendChild(prop)
+
+    description = {}
+    paramsDict = {  'mapred.job.tracker'    : mapredAddr , \
+                    'fs.default.name'       : hdfsAddr, \
+                    'hadoop.tmp.dir'        : confDir, \
+                    'dfs.client.buffer.dir' : tempDir, }
+
     mapredAddrSplit = mapredAddr.split(":")
     mapredsystem = os.path.join('/mapredsystem', mapredAddrSplit[0])
-    prop = self.__create_xml_element(doc, 'mapred.system.dir', mapredsystem, 
-                                   "description", True )
-    topElement.appendChild(prop)
-    prop = self.__create_xml_element(doc, 'hadoop.tmp.dir', confDir, 
-                                   "description")
-    topElement.appendChild(prop)
-    prop = self.__create_xml_element(doc, 'dfs.client.buffer.dir', 
-                                     confDir, "description")
-    topElement.appendChild(prop)
-
-    # clientParams aer enabled now
-    if clientParams:
-      for k, v in clientParams.iteritems():
-        prop = self.__create_xml_element(doc, k, v[0], "client param")
-        topElement.appendChild(prop)
-
+    paramsDict['mapred.system.dir'] = mapredsystem 
+    
+    # mapred-default.xml is no longer used now.
+    numred = int(math.floor(clusterFactor * (int(numNodes) - 1)))
+    paramsDict['mapred.reduce.tasks'] = str(numred)
     # end
 
-    # servelParams
-    if serverParams:
-      for k, v in serverParams.iteritems():
-        prop = self.__create_xml_element(doc, k, v[0], "server param")
-        topElement.appendChild(prop)
+    # for all the above vars generated, set the description
+    for k, v in paramsDict.iteritems():
+      description[k] = 'Hod generated parameter'
 
     # finalservelParams
     if finalServerParams:
       for k, v in finalServerParams.iteritems():
-        prop = self.__create_xml_element(doc, k, v[0], "server param", True)
-        topElement.appendChild(prop)
+        if not description.has_key(k):
+          description[k] = "final server parameter"
+          paramsDict[k] = v
 
-   
-    # mapred-default.xml is no longer used now.
-    numred = int(math.floor(clusterFactor * (int(numNodes) - 1)))
-    prop = self.__create_xml_element(doc, "mapred.reduce.tasks", str(numred), 
-                                 "description")
-    topElement.appendChild(prop)
-    # end
+    # servelParams
+    if serverParams:
+      for k, v in serverParams.iteritems():
+        if not description.has_key(k):
+          # if no final value for same param is mentioned
+          description[k] = "server parameter"
+          paramsDict[k] = v
+
+    # clientParams
+    if clientParams:
+      for k, v in clientParams.iteritems():
+        if not description.has_key(k) or description[k] == "server parameter":
+          # Just add, if no final value for same param is mentioned.
+          # Replace even if server param is mentioned for same config variable
+          description[k] = "client-side parameter"
+          paramsDict[k] = v
+    
+    # generate the xml elements
+    for k,v in paramsDict.iteritems():
+      if ( description[k] == "final server parameter" or \
+                             description[k] == "Hod generated parameter" ): 
+         final = True
+      else: final = False
+      prop = self.__create_xml_element(doc, k, v, description[k], final)
+      topElement.appendChild(prop)
 
     siteName = os.path.join(confDir, "hadoop-site.xml")
     sitefile = file(siteName, 'w')
@@ -174,44 +181,15 @@ class hadoopCluster:
     
     return serviceData
   
-  def __check_allocation_manager(self):
-    userValid = True
-    try:
-      self.serviceProxyClient = hodXRClient(
-        to_http_url(self.__cfg['hod']['proxy-xrs-address']), None, None, 0,
-        0, 1, False, 15)
-      
-      userValid = self.serviceProxyClient.isProjectUserValid(
-        self.__setup.cfg['hod']['userid'], 
-        self.__setup.cfg['resource_manager']['pbs-account'],True)
-      
-      if userValid:
-        self.__log.debug("Validated that user %s is part of project %s." %
-          (self.__cfg['hod']['userid'], 
-           self.__cfg['resource_manager']['pbs-account']))
-      else:
-        self.__log.debug("User %s is not part of project: %s." % (
-          self.__cfg['hod']['userid'], 
-          self.__cfg['resource_manager']['pbs-account']))
-        self.__log.error("Please specify a valid project in "
-                      + "resource_manager.pbs-account. If you still have "
-                      + "issues, please contact operations")
-        userValidd = False
-        # ignore invalid project for now - TODO
-    except Exception:
-      # ignore failures - non critical for now
-      self.__log.debug(
-        "Unable to contact Allocation Manager Proxy - ignoring...")
-      #userValid = False
-        
-    return userValid
-
   def __check_job_status(self):
     initWaitCount = 20
     count = 0
     status = False
     state = 'Q'
     while state == 'Q':
+      if hodInterrupt.isSet():
+        raise HodInterruptException()
+
       state = self.__nodePool.getJobState()
       if (state==False) or (state!='Q'):
         break
@@ -241,6 +219,9 @@ class hadoopCluster:
       waitTime = self.__cfg['hod']['allocate-wait-time']
   
       while count < waitTime:
+        if hodInterrupt.isSet():
+          raise HodInterruptException()
+
         ringList = self.__svcrgyClient.getServiceInfo(
           self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(), 
           'ringmaster', 
@@ -267,8 +248,11 @@ class hadoopCluster:
     serviceAddress = None
     serviceInfo = None
  
-    for i in range(0, 250):
+    for i in range(0, 250): 
       try:
+        if hodInterrupt.isSet():
+            raise HodInterruptException()
+
         serviceAddress = xmlrpcClient.getServiceAddr(serviceName)
         if serviceAddress:
           if serviceAddress == 'not found':
@@ -280,6 +264,8 @@ class hadoopCluster:
           else:
             serviceInfo = xmlrpcClient.getURLs(serviceName)           
             break 
+      except HodInterruptException,h :
+        raise h
       except:
         self.__log.critical("'%s': ringmaster xmlrpc error." % serviceName)
         self.__log.debug(get_exception_string())
@@ -296,6 +282,8 @@ class hadoopCluster:
                                             self.jobId, self.__hostname, 
                                             serviceName, 'grid', serviceInfo)
         
+      except HodInterruptException, h:
+        raise h
       except:
         self.__log.critical("'%s': registry xmlrpc error." % serviceName)    
         self.__log.debug(get_exception_string())
@@ -326,6 +314,8 @@ class hadoopCluster:
          link):
 
          for i in range(1,5):
+           if hodInterrupt.isSet():
+             raise HodInterruptException()
            try:
              input = urllib.urlopen(link)
              break
@@ -385,6 +375,8 @@ class hadoopCluster:
                
              self.__log.debug("Finished grabbing: %s" % link)
            except AlarmException:
+             if hodInterrupt.isSet():
+               raise HodInterruptException()
              if out: out.close()
              if input: input.close()
              
@@ -403,31 +395,12 @@ class hadoopCluster:
     if 'mapred' in clusterInfo:
       mapredAddress = clusterInfo['mapred'][7:]
       hdfsAddress = clusterInfo['hdfs'][7:]
-  
-      mapredSocket = tcpSocket(mapredAddress)
-        
-      try:
-        mapredSocket.open()
-        mapredSocket.close()
-      except tcpError:
-        status = 14
-  
-      hdfsSocket = tcpSocket(hdfsAddress)
-        
-      try:
-        hdfsSocket.open()
-        hdfsSocket.close()
-      except tcpError:
-        if status > 0:
-          status = 10
-        else:
-          status = 13
-      
+      status = get_cluster_status(hdfsAddress, mapredAddress)
       if status == 0:
         status = 12
     else:
       status = 15
-      
+
     return status
   
   def cleanup(self):
@@ -455,37 +428,67 @@ class hadoopCluster:
       self.__log.critical("Minimum nodes must be greater than 2.")
       status = 2
     else:
-      if self.__check_allocation_manager():
-        nodeSet = self.__nodePool.newNodeSet(min)
-        self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet)
-        if self.jobId:                 
-          if self.__check_job_status():
+      nodeSet = self.__nodePool.newNodeSet(min)
+      walltime = None
+      if self.__cfg['hod'].has_key('walltime'):
+        walltime = self.__cfg['hod']['walltime']
+      self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet, walltime)
+      if self.jobId:
+        try:
+          jobStatus = self.__check_job_status()
+        except HodInterruptException, h:
+          self.__log.info(HOD_INTERRUPTED_MESG)
+          self.delete_job(self.jobId)
+          self.__log.info("Job %s qdelled." % self.jobId)
+          raise h
+
+        if jobStatus:
+          self.__log.info("Hod Job successfully submitted. JobId : %s." \
+                                                              % self.jobId)
+          try:
             self.ringmasterXRS = self.__get_ringmaster_client()
+            
+            self.__log.info("Ringmaster at : %s." % self.ringmasterXRS )
+            ringClient = None
             if self.ringmasterXRS:
               ringClient =  hodXRClient(self.ringmasterXRS)
-              
+                
               hdfsStatus, hdfsAddr, self.hdfsInfo = \
                 self.__init_hadoop_service('hdfs', ringClient)
-              
+                
               if hdfsStatus:
+                self.__log.info("HDFS UI on http://%s" % self.hdfsInfo)
+  
                 mapredStatus, mapredAddr, self.mapredInfo = \
                   self.__init_hadoop_service('mapred', ringClient)
-                  
+  
                 if mapredStatus:
-                  self.__log.info("HDFS UI on http://%s" % self.hdfsInfo)
                   self.__log.info("Mapred UI on http://%s" % self.mapredInfo)
- 
+  
+                  if self.__cfg['hod'].has_key('update-worker-info') \
+                    and self.__cfg['hod']['update-worker-info']:
+                    workerInfoMap = {}
+                    workerInfoMap['HDFS UI'] = 'http://%s' % self.hdfsInfo
+                    workerInfoMap['Mapred UI'] = 'http://%s' % self.mapredInfo
+                    ret = self.__nodePool.updateWorkerInfo(workerInfoMap, self.jobId)
+                    if ret != 0:
+                      self.__log.warn('Could not update HDFS and Mapred information.' \
+                                      'User Portal may not show relevant information.' \
+                                      'Error code=%s' % ret)
+  
+                  self.__cfg.replace_escape_seqs()
+                    
                   # Go generate the client side hadoop-site.xml now
                   # adding final-params as well, just so that conf on 
                   # client-side and server-side are (almost) the same
                   clientParams = None
                   serverParams = {}
                   finalServerParams = {}
-
+  
                   # client-params
                   if self.__cfg['hod'].has_key('client-params'):
                     clientParams = self.__cfg['hod']['client-params']
-
+  
                   # server-params
                   if self.__cfg['gridservice-mapred'].has_key('server-params'):
                     serverParams.update(\
@@ -494,8 +497,8 @@ class hadoopCluster:
                     # note that if there are params in both mapred and hdfs
                     # sections, the ones in hdfs overwirte the ones in mapred
                     serverParams.update(\
-                        self.__cfg['gridservice-mapred']['server-params'])
-                  
+                        self.__cfg['gridservice-hdfs']['server-params'])
+                    
                   # final-server-params
                   if self.__cfg['gridservice-mapred'].has_key(\
                                                     'final-server-params'):
@@ -505,9 +508,14 @@ class hadoopCluster:
                                                     'final-server-params'):
                     finalServerParams.update(\
                         self.__cfg['gridservice-hdfs']['final-server-params'])
-
+  
                   clusterFactor = self.__cfg['hod']['cluster-factor']
-                  self.__hadoopCfg.gen_site_conf(clusterDir, min,
+                  tempDir = self.__cfg['hod']['temp-dir']
+                  if not os.path.exists(tempDir):
+                    os.makedirs(tempDir)
+                  tempDir = os.path.join( tempDir, self.__cfg['hod']['userid']\
+                                  + "." + self.jobId )
+                  self.__hadoopCfg.gen_site_conf(clusterDir, tempDir, min,\
                             hdfsAddr, mapredAddr, clientParams,\
                             serverParams, finalServerParams,\
                             clusterFactor)
@@ -520,25 +528,52 @@ class hadoopCluster:
               status = 6
             if status != 0:
               self.__log.info("Cleaning up job id %s, as cluster could not be allocated." % self.jobId)
+              if ringClient is None:
+                self.delete_job(self.jobId)
+              else:
+                self.__log.debug("Calling rm.stop()")
+                ringClient.stopRM()
+                self.__log.debug("Returning from rm.stop()")
+          except HodInterruptException, h:
+            self.__log.info(HOD_INTERRUPTED_MESG)
+            if self.ringmasterXRS:
+              if ringClient is None:
+                ringClient =  hodXRClient(self.ringmasterXRS)
+              self.__log.debug("Calling rm.stop()")
+              ringClient.stopRM()
+              self.__log.debug("Returning from rm.stop()")
+              self.__log.info("Job Shutdown by informing ringmaster.")
+            else:
               self.delete_job(self.jobId)
-          else:
-            self.__log.critical("No job found, ringmaster failed to run.")
-            status = 5 
- 
-        elif self.jobId == False:
-          if exitCode == 188:
-            self.__log.critical("Request execeeded maximum resource allocation.")
-          else:
-            self.__log.critical("Insufficient resources available.")
-          status = 4
-        else:    
-          self.__log.critical("Scheduler failure, allocation failed.\n\n")        
-          status = 4
-      else:
-        status = 9
+              self.__log.info("Job %s qdelled directly." % self.jobId)
+            raise h
+        else:
+          self.__log.critical("No job found, ringmaster failed to run.")
+          status = 5 
+
+      elif self.jobId == False:
+        if exitCode == 188:
+          self.__log.critical("Request execeeded maximum resource allocation.")
+        else:
+          self.__log.critical("Insufficient resources available.")
+        status = 4
+      else:    
+        self.__log.critical("Scheduler failure, allocation failed.\n\n")        
+        status = 4
     
     return status
 
+  def __isRingMasterAlive(self, rmAddr):
+    ret = True
+    rmSocket = tcpSocket(rmAddr)
+    try:
+      rmSocket.open()
+      rmSocket.close()
+    except tcpError:
+      ret = False
+
+    return ret
+
   def deallocate(self, clusterDir, clusterInfo):
     status = 0 
     
@@ -546,6 +581,7 @@ class hadoopCluster:
                                          id=clusterInfo['jobid'])
     self.mapredInfo = clusterInfo['mapred']
     self.hdfsInfo = clusterInfo['hdfs']
+
     try:
       if self.__cfg['hod'].has_key('hadoop-ui-log-dir'):
         clusterStatus = self.check_cluster(clusterInfo)
@@ -554,9 +590,35 @@ class hadoopCluster:
           self.__collect_jobtracker_ui(self.__cfg['hod']['hadoop-ui-log-dir'])
       else:
         self.__log.debug('hadoop-ui-log-dir not specified. Skipping Hadoop UI log collection.')
+    except HodInterruptException, h:
+      # got an interrupt. just pass and proceed to qdel
+      pass 
     except:
       self.__log.info("Exception in collecting Job tracker logs. Ignoring.")
-    status = self.__nodePool.finalize()
+    
+    rmAddr = None
+    if clusterInfo.has_key('ring'):
+      # format is http://host:port/ We need host:port
+      rmAddr = clusterInfo['ring'][7:]
+      if rmAddr.endswith('/'):
+        rmAddr = rmAddr[:-1]
+
+    if (rmAddr is None) or (not self.__isRingMasterAlive(rmAddr)):
+      # Cluster is already dead, don't try to contact ringmaster.
+      self.__nodePool.finalize()
+      status = 10 # As cluster is dead, we just set the status to 'cluster dead'.
+    else:
+      xrsAddr = clusterInfo['ring']
+      rmClient = hodXRClient(xrsAddr)
+      self.__log.debug('calling rm.stop')
+      rmClient.stopRM()
+      self.__log.debug('completed rm.stop')
+
+    # cleanup hod temp dirs
+    tempDir = os.path.join( self.__cfg['hod']['temp-dir'], \
+                    self.__cfg['hod']['userid'] + "." + clusterInfo['jobid'] )
+    if os.path.exists(tempDir):
+      shutil.rmtree(tempDir)
    
     return status
   

+ 85 - 17
src/contrib/hod/hodlib/Hod/hod.py

@@ -15,7 +15,7 @@
 #limitations under the License.
 # -*- python -*-
 
-import sys, os, getpass, pprint, re, cPickle, random, shutil
+import sys, os, getpass, pprint, re, cPickle, random, shutil, time
 
 import hodlib.Common.logger
 
@@ -23,6 +23,9 @@ from hodlib.ServiceRegistry.serviceRegistry import svcrgy
 from hodlib.Common.xmlrpc import hodXRClient
 from hodlib.Common.util import to_http_url, get_exception_string
 from hodlib.Common.util import get_exception_error_string
+from hodlib.Common.util import hodInterrupt, HodInterruptException
+from hodlib.Common.util import HOD_INTERRUPTED_CODE
+
 from hodlib.Common.nodepoolutil import NodePoolUtil
 from hodlib.Hod.hadoop import hadoopCluster, hadoopScript
 
@@ -115,6 +118,9 @@ class hodRunner:
                                    level=self.__cfg['hod']['debug'], 
                                    addToLoggerNames=(self.__user ,))
 
+  def get_logger(self):
+    return self.__log
+
   def __setup_cluster_logger(self, directory):
     self.__baseLogger.add_file(logDirectory=directory, level=4, 
                                addToLoggerNames=(self.__user ,))
@@ -124,6 +130,8 @@ class hodRunner:
 
   def __norm_cluster_dir(self, directory):
     directory = os.path.expanduser(directory)
+    if not os.path.isabs(directory):
+      directory = os.path.join(self.__cfg['hod']['original-dir'], directory)
     directory = os.path.abspath(directory)
     
     return directory
@@ -202,7 +210,18 @@ class hodRunner:
             self.__opCode = self.__cluster.check_cluster(clusterInfo)
             if self.__opCode == 0 or self.__opCode == 15:
               self.__setup_service_registry()   
-              allocateStatus = self.__cluster.allocate(clusterDir, min, max)    
+              if hodInterrupt.isSet(): 
+                self.__cleanup()
+                raise HodInterruptException()
+              self.__log.info("Service Registry Started.")
+              try:
+                allocateStatus = self.__cluster.allocate(clusterDir, min, max)    
+              except HodInterruptException, h:
+                self.__cleanup()
+                raise h
+              # Allocation has gone through.
+              # Don't care about interrupts any more
+
               if allocateStatus == 0:
                 self.__set_cluster_state_info(os.environ, 
                                               self.__cluster.hdfsInfo, 
@@ -213,6 +232,8 @@ class hodRunner:
                 self.__setup_cluster_state(clusterDir)
                 self.__clusterState.write(self.__cluster.jobId, 
                                           self.__clusterStateInfo)
+                #  Do we need to check for interrupts here ??
+
                 self.__set_user_state_info( 
                   { clusterDir : self.__cluster.jobId, } )
               self.__opCode = allocateStatus
@@ -239,7 +260,15 @@ class hodRunner:
       self.__log.critical("%s operation requires two arguments. "  % operation
                         + "A cluster path and n nodes, or min-max nodes.")
       self.__opCode = 3
-  
+ 
+  def _is_cluster_allocated(self, clusterDir):
+    if os.path.isdir(clusterDir):
+      self.__setup_cluster_state(clusterDir)
+      clusterInfo = self.__clusterState.read()
+      if clusterInfo != {}:
+        return True
+    return False
+
   def _op_deallocate(self, args):
     operation = "deallocate"
     argLength = len(args)
@@ -293,25 +322,19 @@ class hodRunner:
         clusterStatus = self.__cluster.check_cluster(clusterInfo)
         if clusterStatus == 12:
           self.__log.info(clusterDir)
-          keys = clusterInfo.keys()
-          keys.sort()
-          for key in keys:
-            if key != 'env':
-              self.__log.info("%s\t%s" % (key, clusterInfo[key]))  
-            
-          if self.__cfg['hod']['debug'] == 4:
-            for var in clusterInfo['env'].keys():
-              self.__log.debug("%s = %s" % (var, clusterInfo['env'][var]))
+          self.__print_cluster_info(clusterInfo)
         elif clusterStatus == 10:
           self.__log.critical("%s cluster is dead" % clusterDir)
         elif clusterStatus == 13:
           self.__log.warn("%s cluster hdfs is dead" % clusterDir)
         elif clusterStatus == 14:
           self.__log.warn("%s cluster mapred is dead" % clusterDir)
-        
+
         if clusterStatus != 12:
           if clusterStatus == 15:
             self.__log.critical("Cluster %s not allocated." % clusterDir)
+          else:
+            self.__print_cluster_info(clusterInfo)
             
           self.__opCode = clusterStatus
       else:
@@ -321,7 +344,19 @@ class hodRunner:
       self.__log.critical("%s operation requires one argument. "  % operation
                         + "A cluster path.")
       self.__opCode = 3      
-  
+ 
+  def __print_cluster_info(self, clusterInfo):
+    keys = clusterInfo.keys()
+    keys.sort()
+    for key in keys:
+      if key != 'env':
+        self.__log.info("%s\t%s" % (key, clusterInfo[key]))  
+            
+    if self.__cfg['hod']['debug'] == 4:
+      for var in clusterInfo['env'].keys():
+        self.__log.debug("%s = %s" % (var, clusterInfo['env'][var]))
+
+ 
   def _op_help(self, args):  
     print "hod operations:\n"
     print " allocate <directory> <nodes> - Allocates a cluster of n nodes using the specified cluster"
@@ -342,6 +377,10 @@ class hodRunner:
       opList = self.__check_operation(operation)
       if self.__opCode == 0:
         getattr(self, "_op_%s" % opList[0])(opList)
+    except HodInterruptException, h:
+      self.__log.critical("op: %s failed because of an process interrupt." \
+                                                                % operation)
+      self.__opCode = HOD_INTERRUPTED_CODE
     except:
       self.__log.critical("op: %s failed: %s" % (operation,
                           get_exception_error_string()))
@@ -356,16 +395,41 @@ class hodRunner:
   def script(self):
     script = self.__cfg['hod']['script']
     nodes = self.__cfg['hod']['min-nodes']
+    isExecutable = os.access(script, os.X_OK)
+    if not isExecutable:
+      self.__log.critical('Script %s is not an executable.' % script)
+      return 1
+
     clusterDir = "/tmp/%s.%s" % (self.__cfg['hod']['userid'], 
                                  random.randint(0, 20000))
     os.mkdir(clusterDir)
+    ret = 0
     try:
       self._op_allocate(('allocate', clusterDir, str(nodes)))
-      scriptRunner = hadoopScript(clusterDir, 
+      if self.__opCode == 0:
+        if self.__cfg['hod'].has_key('script-wait-time'):
+          time.sleep(self.__cfg['hod']['script-wait-time'])
+          self.__log.debug('Slept for %d time. Now going to run the script' % self.__cfg['hod']['script-wait-time'])
+        if hodInterrupt.isSet():
+          self.__log.debug('Interrupt set - not executing script')
+        else:
+          scriptRunner = hadoopScript(clusterDir, 
                                   self.__cfg['hod']['original-dir'])
-      self.__opCode = scriptRunner.run(script)
-      self._op_deallocate(('deallocate', clusterDir))
+          self.__opCode = scriptRunner.run(script)
+          ret = self.__opCode
+          self.__log.debug("Exit code from running the script: %d" % self.__opCode)
+      else:
+        self.__log.critical("Error %d in allocating the cluster. Cannot run the script." % self.__opCode)
+
+      if hodInterrupt.isSet():
+        # Got interrupt while executing script. Unsetting it for deallocating
+        hodInterrupt.setFlag(False)
+      if self._is_cluster_allocated(clusterDir):
+        self._op_deallocate(('deallocate', clusterDir))
       shutil.rmtree(clusterDir, True)
+    except HodInterruptException, h:
+      self.__log.critical("Script failed because of an process interrupt.")
+      self.__opCode = HOD_INTERRUPTED_CODE
     except:
       self.__log.critical("script: %s failed: %s" % (script,
                           get_exception_error_string()))
@@ -373,4 +437,8 @@ class hodRunner:
     
     self.__cleanup()      
     
+    # We want to give importance to a failed script's exit code.
+    if ret != 0:
+      self.__opCode = ret
+
     return self.__opCode

+ 4 - 0
src/contrib/hod/hodlib/Hod/nodePool.py

@@ -108,6 +108,10 @@ class NodePool:
     """Delete a job, given it's id"""
     raise NotImplementedError
 
+  def updateWorkerInfo(self, workerInfoMap):
+    """Update information about the workers started by this NodePool."""
+    raise NotImplementedError
+
   def getNextNodeSetId(self):
     id = self.nextNodeSetId
     self.nextNodeSetId += 1

+ 93 - 147
src/contrib/hod/hodlib/HodRing/hodRing.py

@@ -19,13 +19,14 @@
 """
 # -*- python -*-
 import os, sys, time, shutil, getpass, xml.dom.minidom, xml.dom.pulldom
-import socket, sets, urllib, csv, signal, pprint, random, re
+import socket, sets, urllib, csv, signal, pprint, random, re, httplib
 
 from xml.dom import getDOMImplementation
 from pprint import pformat
 from optparse import OptionParser
 from urlparse import urlparse
-from hodlib.Common.util import local_fqdn
+from hodlib.Common.util import local_fqdn, parseEquals
+from hodlib.Common.tcp import tcpSocket, tcpError 
 
 binfile = sys.path[0]
 libdir = os.path.dirname(binfile)
@@ -53,6 +54,7 @@ class CommandDesc:
     self.log.debug("In command desc")
     self.log.debug("Done in command desc")
     dict.setdefault('argv', [])
+    dict.setdefault('version', None)
     dict.setdefault('envs', {})
     dict.setdefault('java-opts', [])
     dict.setdefault('workdirs', [])
@@ -83,6 +85,9 @@ class CommandDesc:
   def getArgv(self):
     return self.dict['argv']
 
+  def getVersion(self):
+    return self.dict['version']
+
   def getEnvs(self):
     return self.dict['envs']
 
@@ -243,9 +248,13 @@ class HadoopCommand:
     topElement = doc.documentElement
     topElement.appendChild(comment)
     
-    attr = self.desc.getfinalAttrs()
-    self.createXML(doc, attr, topElement, True)
-    attr = self.desc.getAttrs()
+    finalAttr = self.desc.getfinalAttrs()
+    self.createXML(doc, finalAttr, topElement, True)
+    attr = {}
+    attr1 = self.desc.getAttrs()
+    for k,v in attr1.iteritems():
+      if not finalAttr.has_key(k):
+        attr[k] = v
     self.createXML(doc, attr, topElement, False)
               
     
@@ -306,7 +315,7 @@ class HadoopCommand:
     fenvs = os.environ
     
     for k, v in envs.iteritems():
-      fenvs[k] = v[0]
+      fenvs[k] = v
     
     self.log.debug(javaOpts)
     fenvs['HADOOP_OPTS'] = ''
@@ -440,6 +449,15 @@ class HodRing(hodBaseService):
     self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage))
     return hadoopPackage
 
+  def getRunningValues(self):
+    return self.__running.values()
+
+  def getTempDir(self):
+    return self.__tempDir
+
+  def getHadoopLogDirs(self):
+    return self.__hadoopLogDirs
+ 
   def __download_package(self, ringClient):
     self.log.debug("Found download address: %s" % 
                    self._cfg['download-addr'])
@@ -523,6 +541,75 @@ class HodRing(hodBaseService):
         continue
       self.__running[id-1] = cmd
 
+      # ok.. now command is running. If this HodRing got jobtracker, 
+      # Check if it is ready for accepting jobs, and then only return
+      self.__check_jobtracker(desc, id-1)
+      
+  def __check_jobtracker(self, desc, id):
+    # Check jobtracker status. Return properly if it is ready to accept jobs.
+    # Currently Checks for Jetty to come up, the last thing that can be checked
+    # before JT completes initialisation. To be perfectly reliable, we need 
+    # hadoop support
+    name = desc.getName()
+    if name == 'jobtracker':
+      # Yes I am the Jobtracker
+      self.log.debug("Waiting for jobtracker to initialise")
+      version = desc.getVersion()
+      self.log.debug("jobtracker version : %s" % version)
+      attrs = self.getRunningValues()[id].getFilledInKeyValues()
+      attrs = parseEquals(attrs)
+      jobTrackerAddr = attrs['mapred.job.tracker']
+      self.log.debug("jobtracker rpc server : %s" % jobTrackerAddr)
+      if version < 16:
+        jettyAddr = jobTrackerAddr.split(':')[0] + ':' + \
+                              attrs['mapred.job.tracker.info.port']
+      else:
+        jettyAddr = attrs['mapred.job.tracker.http.bindAddress']
+      self.log.debug("Jobtracker jetty : %s" % jettyAddr)
+
+      # Check for Jetty to come up
+      # For this do a http head, and then look at the status
+      defaultTimeout = socket.getdefaulttimeout()
+      # socket timeout isn`t exposed at httplib level. Setting explicitly.
+      socket.setdefaulttimeout(1)
+      sleepTime = 0.5
+      jettyStatus = False
+      jettyStatusmsg = ""
+      while sleepTime <= 32:
+        try:
+          jettyConn = httplib.HTTPConnection(jettyAddr)
+          jettyConn.request("HEAD", "/jobtracker.jsp")
+          # httplib inherently retries the following till socket timeout
+          resp = jettyConn.getresponse()
+          if resp.status != 200:
+            # Some problem?
+            jettyStatus = False
+            jettyStatusmsg = "Jetty gave a non-200 response to a HTTP-HEAD" +\
+                             " request. HTTP Status (Code, Msg): (%s, %s)" % \
+                             ( resp.status, resp.reason )
+            break
+          else:
+            self.log.info("Jetty returned a 200 status (%s)" % resp.reason)
+            self.log.info("JobTracker successfully initialised")
+            return
+        except socket.error:
+          self.log.debug("Jetty gave a socket error. Sleeping for %s" \
+                                                                  % sleepTime)
+          time.sleep(sleepTime)
+          sleepTime = sleepTime * 2
+        except Exception, e:
+          jettyStatus = False
+          jettyStatusmsg = ("Process(possibly other than jetty) running on" + \
+                  " port assigned to jetty is returning invalid http response")
+          break
+      socket.setdefaulttimeout(defaultTimeout)
+      if not jettyStatus:
+        self.log.critical("Jobtracker failed to initialise.")
+        if jettyStatusmsg:
+          self.log.critical( "Reason: %s" % jettyStatusmsg )
+        else: self.log.critical( "Reason: Jetty failed to give response")
+        raise Exception("JobTracker failed to initialise")
+
   def stop(self):
     self.log.debug("Entered hodring stop.")
     if self._http: 
@@ -532,153 +619,12 @@ class HodRing(hodBaseService):
     self.log.debug("call hodsvcrgy stop...")
     hodBaseService.stop(self)
     
-    self.clean_up()
-    
-  def clean_up(self):
-    os.chdir(originalcwd)
-    if not mswindows:
-      # do the UNIX double-fork magic, see Stevens' "Advanced 
-      # Programming in the UNIX Environment" for details (ISBN 0201563177)
-      try: 
-        pid = os.fork() 
-        if pid > 0:
-          # exit first parent
-          sys.exit(0) 
-      except OSError, e: 
-        self.log.error("fork #1 failed: %d (%s)" % (e.errno, e.strerror)) 
-        sys.exit(1)
-
-      # decouple from parent environment
-      os.chdir("/") 
-      os.setsid() 
-      os.umask(0) 
-
-      # do second fork
-      try: 
-        pid = os.fork() 
-        if pid > 0:
-          # exit from second parent, print eventual PID before
-          sys.exit(0) 
-      except OSError, e: 
-        self.log.error("fork #2 failed: %d (%s)" % (e.errno, e.strerror))
-        sys.exit(1) 
-
-    try:
-#      for cmd in self.__running.values():
-#        self.log.debug("killing %s..." % cmd)
-#        cmd.kill()
-  
-      list = []
-      
-      for cmd in self.__running.values():
-        self.log.debug("addding %s to cleanup list..." % cmd)
-        cmd.addCleanup(list)
-      
-      list.append(self.__tempDir)
-         
-      self.__archive_logs()   
-          
-      for dir in list:
-        if os.path.exists(dir):
-          self.log.debug('removing %s' % (dir))
-          shutil.rmtree(dir, True)
-    except:
-      self.log.error(get_exception_string())
-    sys.exit(0)
-
   def _xr_method_clusterStart(self, initialize=True):
     return self.clusterStart(initialize)
 
   def _xr_method_clusterStop(self):
     return self.clusterStop()
  
-  def __copy_archive_to_dfs(self, archiveFile):        
-    hdfsURIMatch = reHdfsURI.match(self._cfg['log-destination-uri'])
-    
-    # FIXME this is a complete and utter hack. Currently hadoop is broken
-    # and does not understand hdfs:// syntax on the command line :(
-    
-    pid = os.getpid()
-    tempConfDir = '/tmp/%s' % pid
-    os.mkdir(tempConfDir)
-    tempConfFileName = '%s/hadoop-site.xml' % tempConfDir
-    tempHadoopConfig = open(tempConfFileName, 'w')
-    print >>tempHadoopConfig, "<configuration>"
-    print >>tempHadoopConfig, "  <property>"
-    print >>tempHadoopConfig, "    <name>fs.default.name</name>"
-    print >>tempHadoopConfig, "    <value>%s</value>" % hdfsURIMatch.group(1)
-    print >>tempHadoopConfig, "    <description>No description</description>"
-    print >>tempHadoopConfig, "  </property>"
-    print >>tempHadoopConfig, "</configuration>"
-    tempHadoopConfig.close()
-    
-    # END LAME HACK
-    
-    (head, tail) = os.path.split(archiveFile)
-    destFile = os.path.join(hdfsURIMatch.group(2), self._cfg['userid'], 
-                            self._cfg['service-id'], tail)
-    
-    self.log.info("copying archive %s to DFS %s ..." % (archiveFile, destFile))
-    
-    runningHadoops = self.__running.values()
-    if (len(runningHadoops) == 0):
-      self.log.info("len(runningHadoops) == 0, No running cluster?")
-      self.log.info("Skipping __copy_archive_to_dfs")
-      return
- 
-    run = runningHadoops[0]
-    hadoopCmd = run.path
-    if self._cfg.has_key('pkgs'):
-      hadoopCmd = os.path.join(self._cfg['pkgs'], 'bin', 'hadoop')
-
-    # LAME HACK AGAIN, using config generated above :( 
-    copyCommand = "%s --config %s dfs -copyFromLocal %s %s" % (hadoopCmd, 
-      tempConfDir, archiveFile, destFile)
-    
-    self.log.debug(copyCommand)
-    
-    copyThread = simpleCommand('hadoop', copyCommand)
-    copyThread.start()
-    copyThread.wait()
-    copyThread.join()
-    self.log.debug(pprint.pformat(copyThread.output()))
-    
-    # LAME HACK AGAIN, deleting config generated above :( 
-    os.unlink(tempConfFileName)
-    os.rmdir(tempConfDir)
-    os.unlink(archiveFile)
-  
-  def __archive_logs(self):
-    status = True
-    if self._cfg.has_key("log-destination-uri"):
-      try:
-        if self.__hadoopLogDirs:
-          date = time.localtime()
-          for logDir in self.__hadoopLogDirs:
-            (head, tail) = os.path.split(logDir)
-            (head, logType) = os.path.split(head)
-            tarBallFile = "%s-%s-%04d%02d%02d%02d%02d%02d-%s.tar.gz" % (
-              logType, local_fqdn(), date[0], date[1], date[2], date[3], 
-              date[4], date[5], random.randint(0,1000))
-            
-            if self._cfg["log-destination-uri"].startswith('file://'):
-              tarBallFile = os.path.join(self._cfg["log-destination-uri"][7:], 
-                                         tarBallFile)
-            else:
-              tarBallFile = os.path.join(self._cfg['temp-dir'], tarBallFile)
-            
-            self.log.info('archiving log files to: %s' % tarBallFile)
-            status = tar(tarBallFile, logDir, ['*',])
-            self.log.info('archive %s status: %s' % (tarBallFile, status))
-            if status and \
-              self._cfg["log-destination-uri"].startswith('hdfs://'):
-              self.__copy_archive_to_dfs(tarBallFile)
-          dict = {} 
-      except:
-        self.log.error(get_exception_string())
-      
-    return status
-      
   def start(self):
     """Run and maintain hodring commands"""
     

+ 14 - 6
src/contrib/hod/hodlib/NodePools/torque.py

@@ -150,7 +150,8 @@ class TorquePool(NodePool):
         break
       
       argList.extend(process_qsub_attributes())
-      argList.extend(('-N', 'HOD'))
+
+      argList.extend(('-N', '"' + self._cfg['hod']['title'] + '"'))
       argList.extend(('-r','n'))
 
       if 'pbs-user' in self._cfg['resource_manager']:
@@ -161,9 +162,11 @@ class TorquePool(NodePool):
         queue = self._cfg['resource_manager']['queue']
         argList.extend(('-q',queue))
   
-      # accounting should recognize userid:pbs-account as being "charged"
-      argList.extend(('-A', (self._cfg['hod']['userid'] + ':' + 
-                   self._cfg['resource_manager']['pbs-account'])))
+      # In HOD 0.4, we pass in an account string only if it is mentioned.
+      # Also, we don't append userid to the account string, as HOD jobs run as the 
+      # user running them, not as 'HOD' user.
+      if self._cfg['resource_manager'].has_key('pbs-account'):
+        argList.extend(('-A', (self._cfg['resource_manager']['pbs-account'])))
     
       if 'env-vars' in self._cfg['resource_manager']:
         qsub_envs = self._cfg['resource_manager']['env-vars']
@@ -177,7 +180,7 @@ class TorquePool(NodePool):
   def __keyValToString(self, keyValList):
     ret = ""
     for key in keyValList:
-      ret = "%s%s=%s," % (ret, key, keyValList[key][0])
+      ret = "%s%s=%s," % (ret, key, keyValList[key])
     return ret[:-1]
   
   def newNodeSet(self, numNodes, preferred=[], isPreemptee=True, id=None):
@@ -288,5 +291,10 @@ class TorquePool(NodePool):
   def runWorkers(self, args):
     return self.__torque.pbsdsh(args)
 
-
+  def updateWorkerInfo(self, workerInfoMap, jobId):
+    workerInfoStr = ''
+    for key in workerInfoMap.keys():
+      workerInfoStr = '%s,%s:%s' % (workerInfoStr, key, workerInfoMap[key])
+    exitCode = self.__torque.qalter("notes", workerInfoStr[1:], jobId)
+    return exitCode
 

+ 57 - 34
src/contrib/hod/hodlib/RingMaster/idleJobTracker.py

@@ -16,7 +16,20 @@
 import os, re, time
 from hodlib.Common.threads import loop, func
 from hodlib.Common.threads import simpleCommand
-from hodlib.Common.util import get_exception_string
+from hodlib.Common.util import get_exception_string, hadoopVersion
+
+class HadoopJobStatus:
+  """This class represents the status of a single Hadoop job"""
+  
+  def __init__(self, jobId, status):
+    self.__jobId = jobId
+    self.__status = status
+
+  def getJobId(self):
+    return self.__jobId
+
+  def getStatus(self):
+    return self.__status
 
 class JobTrackerMonitor:
   """This class monitors the JobTracker of an allocated cluster
@@ -39,9 +52,11 @@ class JobTrackerMonitor:
     # The service info provider will be polled until we get the URL.
     self.__serviceInfoProvider = servInfoProvider
     self.__jobCountRegExp = re.compile("([0-9]+) jobs currently running.*")
+    self.__jobStatusRegExp = re.compile("(\S+)\s+(\d)\s+\d+\s+\S+$")
     self.__firstIdleTime = 0
+    self.__hadoop15Version = { 'major' : '0', 'minor' : '15' }
     #Assumption: we are not going to support versions older than 0.15 for Idle Job tracker.
-    if not self.__isCompatibleHadoopVersion():
+    if not self.__isCompatibleHadoopVersion(self.__hadoop15Version):
       raise Exception('Incompatible Hadoop Version: Cannot check status')
     self.__stopFlag = False
     self.__jtURLFinderThread = func(name='JTURLFinderThread', functionRef=self.getJobTrackerURL)
@@ -87,6 +102,36 @@ class JobTrackerMonitor:
     except:
       self.__log.debug('Exception while monitoring job tracker. %s' % get_exception_string())
 
+  def getJobsStatus(self):
+    """This method should return the status of all jobs that are run on the HOD allocated
+       hadoop cluster"""
+    jobStatusList = []
+    try:
+      hadoop16Version = { 'major' : '0', 'minor' : '16' }
+      if self.__isCompatibleHadoopVersion(hadoop16Version):
+        jtStatusCommand = self.__initStatusCommand(option='-list all')
+        jtStatusCommand.start()
+        jtStatusCommand.wait()
+        jtStatusCommand.join()
+        if jtStatusCommand.exit_code() == 0:
+          for line in jtStatusCommand.output():
+            jobStatus = self.__extractJobStatus(line)
+            if jobStatus is not None:
+              jobStatusList.append(jobStatus)
+    except:
+      self.__log.debug('Exception while getting job statuses. %s' % get_exception_string())
+    return jobStatusList
+
+  def __extractJobStatus(self, line):
+    """This method parses an output line from the job status command and creates
+       the JobStatus object if there is a match"""
+    jobStatus = None
+    line = line.strip()
+    jsMatch = self.__jobStatusRegExp.match(line)
+    if jsMatch:
+      jobStatus = HadoopJobStatus(jsMatch.group(1), int(jsMatch.group(2)))
+    return jobStatus
+
   def __isIdle(self):
     """This method checks if the JobTracker is idle beyond a certain limit."""
     if self.__getJobCount() == 0:
@@ -121,47 +166,25 @@ class JobTrackerMonitor:
           jobs = int(match.group(1))
     return jobs
 
-  def __findHadoopVersion(self):
-    """This method determines the version of hadoop being used by executing the 
-       hadoop version command"""
-    verMap = { 'major' : None, 'minor' : None }
-    hadoopPath = os.path.join(self.__hadoopDir, 'bin', 'hadoop')
-    cmd = "%s version" % hadoopPath
-    self.__log.debug('Executing command %s to find hadoop version' % cmd)
-    env = os.environ
-    env['JAVA_HOME'] = self.__javaHome
-    hadoopVerCmd = simpleCommand('HadoopVersion', cmd, env)
-    hadoopVerCmd.start()
-    hadoopVerCmd.wait()
-    hadoopVerCmd.join()
-    if hadoopVerCmd.exit_code() == 0:
-      verLine = hadoopVerCmd.output()[0]
-      self.__log.debug('Version from hadoop command: %s' % verLine)
-      hadoopVerRegExp = re.compile("Hadoop ([0-9]+)\.([0-9]+).*")
-      verMatch = hadoopVerRegExp.match(verLine)
-      if verMatch != None:
-        verMap['major'] = verMatch.group(1)
-        verMap['minor'] = verMatch.group(2)
-
-    return verMap
-
-  def __isCompatibleHadoopVersion(self):
+  def __isCompatibleHadoopVersion(self, expectedVersion):
     """This method determines whether the version of hadoop being used is one that 
-       provides the hadoop job -list command or not"""
-    ver = self.__findHadoopVersion()
+       is higher than the expectedVersion.
+       This can be used for checking if a particular feature is available or not"""
+    ver = hadoopVersion(self.__hadoopDir, self.__javaHome, self.__log)
     ret = False
   
-    if (ver['major']!=None) and (int(ver['major']) >= 0) \
-      and (ver['minor']!=None) and (int(ver['minor']) >= 15):
+    if (ver['major']!=None) and (int(ver['major']) >= int(expectedVersion['major'])) \
+      and (ver['minor']!=None) and (int(ver['minor']) >= int(expectedVersion['minor'])):
       ret = True
-
     return ret
 
-  def __initStatusCommand(self):
+  def __initStatusCommand(self, option="-list"):
     """This method initializes the command to run to check the JT status"""
     cmd = None
     hadoopPath = os.path.join(self.__hadoopDir, 'bin', 'hadoop')
-    cmdStr = "%s job -jt %s -list" % (hadoopPath, self.__jobTrackerURL)
+    cmdStr = "%s job -jt %s" % (hadoopPath, self.__jobTrackerURL)
+    cmdStr = "%s %s" % (cmdStr, option)
+    self.__log.debug('cmd str %s' % cmdStr)
     env = os.environ
     env['JAVA_HOME'] = self.__javaHome
     cmd = simpleCommand('HadoopStatus', cmdStr, env)

+ 90 - 13
src/contrib/hod/hodlib/RingMaster/ringMaster.py

@@ -28,7 +28,7 @@ libdir = os.path.dirname(binfile)
 sys.path.append(libdir)
 
 import hodlib.Common.logger
-from hodlib.RingMaster.idleJobTracker import JobTrackerMonitor
+from hodlib.RingMaster.idleJobTracker import JobTrackerMonitor, HadoopJobStatus
 
 from hodlib.Common.threads import func 
 
@@ -484,7 +484,20 @@ class _LogMasterSources:
     
     return addr
 
-  
+  def stopRM(self):
+    """An XMLRPC call which will spawn a thread to stop the Ringmaster program."""
+    # We spawn a thread here because we want the XMLRPC call to return. Calling
+    # stop directly from here will also stop the XMLRPC server.
+    try:
+      self.log.debug("inside xml-rpc call to stop ringmaster")
+      rmStopperThread = func('RMStopper', self.rm.stop)
+      rmStopperThread.start()
+      self.log.debug("returning from xml-rpc call to stop ringmaster")
+      return True
+    except:
+      self.log.debug("Exception in stop: %s" % get_exception_string())
+      return False
+
 class RingMaster:
   def __init__(self, cfg, log, **kwds):
     """starts nodepool and services"""
@@ -499,6 +512,8 @@ class RingMaster:
     self.__jtMonitor = None
     self.__idlenessDetected = False
     self.__stopInProgress = False
+    self.__isStopped = False # to let main exit
+    self.__exitCode = 0 # exit code with which the ringmaster main method should return
 
     self.__initialize_signal_handlers()
     
@@ -544,23 +559,33 @@ class RingMaster:
 
       hdfsDesc = sdl['hdfs']
       hdfs = None
+ 
+      # Determine hadoop Version
+      hadoopVers = hadoopVersion(self.__getHadoopDir(), \
+                                self.cfg['hodring']['java-home'], self.log)
+      
       if hdfsDesc.isExternal():
-        hdfs = HdfsExternal(hdfsDesc, workDirs)
+        hdfs = HdfsExternal(hdfsDesc, workDirs, version=int(hadoopVers['minor']))
+        hdfs.setMasterParams( self.cfg['gridservice-hdfs'] )
       else:
-        hdfs = Hdfs(hdfsDesc, workDirs, 0)
+        hdfs = Hdfs(hdfsDesc, workDirs, 0, version=int(hadoopVers['minor']))
 
       self.serviceDict[hdfs.getName()] = hdfs
       
       mrDesc = sdl['mapred']
       mr = None
       if mrDesc.isExternal():
-        mr = MapReduceExternal(mrDesc, workDirs)
+        mr = MapReduceExternal(mrDesc, workDirs, version=int(hadoopVers['minor']))
+        mr.setMasterParams( self.cfg['gridservice-mapred'] )
       else:
-        mr = MapReduce(mrDesc, workDirs,1)
+        mr = MapReduce(mrDesc, workDirs,1, version=int(hadoopVers['minor']))
 
       self.serviceDict[mr.getName()] = mr
     except:
-      self.log.debug(get_exception_string)
+      self.log.critical("Exception in creating Hdfs and Map/Reduce descriptor objects: \
+                            %s." % get_exception_error_string())
+      self.log.debug(get_exception_string())
+      raise
 
     # should not be starting these in a constructor
     ringMasterServer.startService(self.serviceDict, cfg, self.np, log, self)
@@ -860,23 +885,74 @@ class RingMaster:
     
     self._finalize()
 
+  def __findExitCode(self):
+    """Determine the exit code based on the status of the cluster or jobs run on them"""
+    xmlrpcServer = ringMasterServer.instance.logMasterSources
+    if xmlrpcServer.getServiceAddr('hdfs') == 'not found':
+      self.__exitCode = 7
+    elif xmlrpcServer.getServiceAddr('mapred') == 'not found':
+      self.__exitCode = 8
+    else:
+      clusterStatus = get_cluster_status(xmlrpcServer.getServiceAddr('hdfs'),
+                                          xmlrpcServer.getServiceAddr('mapred'))
+      if clusterStatus != 0:
+        self.__exitCode = clusterStatus
+      else:
+        self.__exitCode = self.__findHadoopJobsExitCode()
+    self.log.debug('exit code %s' % self.__exitCode)
+
+  def __findHadoopJobsExitCode(self):
+    """Determine the consolidate exit code of hadoop jobs run on this cluster, provided
+       this information is available. Return 0 otherwise"""
+    ret = 0
+    failureStatus = 3
+    failureCount = 0
+    if self.__jtMonitor:
+      jobStatusList = self.__jtMonitor.getJobsStatus()
+      try:
+        if len(jobStatusList) > 0:
+          for jobStatus in jobStatusList:
+            self.log.debug('job status for %s: %s' % (jobStatus.getJobId(), 
+                                                      jobStatus.getStatus()))
+            if jobStatus.getStatus() == failureStatus:
+              failureCount = failureCount+1
+        if failureCount > 0:
+          if failureCount == len(jobStatusList): # all jobs failed
+            ret = 16
+          else:
+            ret = 17
+      except:
+        self.log.debug('exception in finding hadoop jobs exit code' % get_exception_string())
+    return ret
+
   def stop(self):
     self.log.debug("RingMaster stop method invoked.")
-    if self.__stopInProgress:
+    if self.__stopInProgress or self.__isStopped:
       return
     self.__stopInProgress = True
-    if self.__jtMonitor is not None:
-      self.__jtMonitor.stop()
     if ringMasterServer.instance is not None:
+      self.log.debug('finding exit code')
+      self.__findExitCode()
       self.log.debug('stopping ringmaster instance')
       ringMasterServer.stopService()
+    else:
+      self.__exitCode = 6
+    if self.__jtMonitor is not None:
+      self.__jtMonitor.stop()
     if self.httpServer:
       self.httpServer.stop()
       
     self.__clean_up()
+    self.__isStopped = True
+
+  def shouldStop(self):
+    """Indicates whether the main loop should exit, either due to idleness condition, 
+    or a stop signal was received"""
+    return self.__idlenessDetected or self.__isStopped
 
-  def isClusterIdle(self):
-    return self.__idlenessDetected
+  def getExitCode(self):
+    """return the exit code of the program"""
+    return self.__exitCode
 
 def main(cfg,log):
   try:
@@ -885,10 +961,11 @@ def main(cfg,log):
     cfg = dGen.initializeDesc()
     rm = RingMaster(cfg, log)
     rm.start()
-    while not rm.isClusterIdle():
+    while not rm.shouldStop():
       time.sleep(1)
     rm.stop()
     log.debug('returning from main')
+    return rm.getExitCode()
   except Exception, e:
     if log:
       log.critical(get_exception_string())

+ 32 - 4
src/contrib/hod/hodlib/Schedulers/torque.py

@@ -28,6 +28,7 @@ class torqueInterface:
     self.__qstat = os.path.join(torqueDir, 'bin', 'qstat')
     self.__pbsNodes = os.path.join(torqueDir, 'bin', 'pbsnodes')
     self.__pbsdsh = os.path.join(torqueDir, 'bin', 'pbsdsh')
+    self.__qalter = os.path.join(torqueDir, 'bin', 'qalter')
     self.__env = environment
     
     self.__log = log
@@ -48,11 +49,23 @@ class torqueInterface:
     while qsubProcess.stdin == None:
       time.sleep(.2)
 
-    for line in stdinList:
-      self.__log.debug("qsub stdin: %s" % line)
-      print >>qsubProcess.stdin, line
+    try:
+      for line in stdinList:
+        self.__log.debug("qsub stdin: %s" % line)
+        print >>qsubProcess.stdin, line
+      qsubProcess.stdin.close()
+    except IOError, i:
+      # If torque's qsub is given invalid params, it fails & returns immediately
+      # Check for such errors here
+      # Wait for command execution to finish
+      qsubProcess.wait()
+      qsubProcess.join()
+      output = qsubProcess.output()
+      if output!=[]:
+        self.__log.critical("qsub Failure : %s " % output[0].strip())
+        self.__log.critical("qsub Command : %s" % qsubCommand)
+      return None, qsubProcess.exit_code()
 
-    qsubProcess.stdin.close()
     qsubProcess.wait()
     qsubProcess.join()
     
@@ -145,3 +158,18 @@ class torqueInterface:
     if not status: status = 0
       
     return status  
+
+  def qalter(self, fieldName, fieldValue, jobId):
+    """Update the job field with fieldName with the fieldValue.
+       The fieldValue must be modifiable after the job is submitted."""
+
+    # E.g. to alter comment: qalter -W notes='value` jobId
+    qalterCmd = '%s -W %s=\"%s\" %s' % (self.__qalter, fieldName, fieldValue, jobId) 
+    self.__log.debug("qalter command: %s" % qalterCmd)
+    qalterProcess = simpleCommand('qalter', qalterCmd, env=self.__env)
+    qalterProcess.start()
+    qalterProcess.wait()
+    qalterProcess.join()
+    exitCode = qalterProcess.exit_code()
+
+    return exitCode

+ 1 - 1
src/docs/src/documentation/content/xdocs/hod.xml

@@ -144,7 +144,7 @@
               <em>Twisted Python:</em> This can be used for improving the scalability of HOD. Twisted Python is available <a href="http://twistedmatrix.com/trac/">here</a>.
             </li>
             <li>
-            <em>Hadoop:</em> HOD can automatically distribute Hadoop to all nodes in the cluster. However, it can also use a pre-installed version of Hadoop, if it is available on all nodes in the cluster. HOD currently supports only Hadoop 0.16, which is under development.
+            <em>Hadoop:</em> HOD can automatically distribute Hadoop to all nodes in the cluster. However, it can also use a pre-installed version of Hadoop, if it is available on all nodes in the cluster. HOD currently supports Hadoop 0.15 and above.
             </li>
           </ul>
           <p>