Kaynağa Gözat

HADOOP-2899. [HOD] Cleans up hdfs:///mapredsystem directory after deallocation. Contributed by Hemanth Yamijala.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@639219 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 yıl önce
ebeveyn
işleme
f036215e78

+ 3 - 0
CHANGES.txt

@@ -120,6 +120,9 @@ Trunk (unreleased changes)
     HADOOP-2848. [HOD]hod -o list and deallocate works even after deleting
     the cluster directory. (Hemanth Yamijala via ddas)
 
+    HADOOP-2899. [HOD] Cleans up hdfs:///mapredsystem directory after
+    deallocation. (Hemanth Yamijala via ddas) 
+
   OPTIMIZATIONS
 
     HADOOP-2790.  Fixed inefficient method hasSpeculativeTask by removing

+ 10 - 27
src/contrib/hod/bin/hodcleanup

@@ -51,11 +51,12 @@ from hodlib.Common.util import local_fqdn, tar, filter_warnings,\
                             get_exception_string, get_exception_error_string
 from hodlib.Common.logger import hodLog
 from hodlib.Common.logger import getLogger
+from hodlib.HodRing.hodRing import createMRSystemDirectoryManager
 
 filter_warnings()
 
 reVersion = re.compile(".*(\d+_\d+).*")
-reHdfsURI = re.compile("hdfs://(.*?:\d+)(.*)")
+reHdfsURI = re.compile("(hdfs://.*?:\d+)(.*)")
 
 VERSION = None
 if os.path.exists("./VERSION"):
@@ -103,25 +104,6 @@ def __copy_archive_to_dfs(conf, archiveFile):
   # need log-destination-uri, hadoopCommandstring and/or pkgs
   hdfsURIMatch = reHdfsURI.match(conf['log-destination-uri'])
   
-  # FIXME this is a complete and utter hack. Currently hadoop is broken
-  # and does not understand hdfs:// syntax on the command line :(
-  
-  pid = os.getpid()
-  tempConfDir = '/tmp/%s' % pid
-  os.mkdir(tempConfDir)
-  tempConfFileName = '%s/hadoop-site.xml' % tempConfDir
-  tempHadoopConfig = open(tempConfFileName, 'w')
-  print >>tempHadoopConfig, "<configuration>"
-  print >>tempHadoopConfig, "  <property>"
-  print >>tempHadoopConfig, "  <name>fs.default.name</name>"
-  print >>tempHadoopConfig, "  <value>%s</value>" % hdfsURIMatch.group(1)
-  print >>tempHadoopConfig, "  <description>No description</description>"
-  print >>tempHadoopConfig, "  </property>"
-  print >>tempHadoopConfig, "</configuration>"
-  tempHadoopConfig.close()
-  
-  # END LAME HACK
-  
   (head, tail) = os.path.split(archiveFile)
   destFile = os.path.join(hdfsURIMatch.group(2), conf['user-id'], 'hod-logs', conf['service-id'], tail)
   
@@ -131,9 +113,8 @@ def __copy_archive_to_dfs(conf, archiveFile):
   if conf['pkgs']:
     hadoopCmd = os.path.join(conf['pkgs'], 'bin', 'hadoop')
 
-  # LAME HACK AGAIN, using config generated above :( 
-  copyCommand = "%s --config %s dfs -copyFromLocal %s %s" % (hadoopCmd, 
-    tempConfDir, archiveFile, destFile)
+  copyCommand = "%s dfs -fs %s -copyFromLocal %s %s" % (hadoopCmd, 
+    hdfsURIMatch.group(1), archiveFile, destFile)
   
   log.debug(copyCommand)
   
@@ -143,9 +124,6 @@ def __copy_archive_to_dfs(conf, archiveFile):
   copyThread.join()
   log.debug(pprint.pformat(copyThread.output()))
   
-  # LAME HACK AGAIN, deleting config generated above :( 
-  os.unlink(tempConfFileName)
-  os.rmdir(tempConfDir)
   os.unlink(archiveFile)
 
 def unpack():
@@ -153,7 +131,8 @@ def unpack():
   option_list=["--log-destination-uri", "--hadoop-log-dirs", \
           "--temp-dir", "--hadoop-command-string", "--pkgs", "--user-id", \
           "--service-id", "--hodring-debug", "--hodring-log-dir", \
-          "--hodring-syslog-address", "--hodring-cleanup-list"]
+          "--hodring-syslog-address", "--hodring-cleanup-list", \
+          "--jt-pid", "--mr-sys-dir", "--fs-name", "--hadoop-path"]
   regexp = re.compile("^--")
   for opt in option_list:
     parser.add_option(opt,dest=regexp.sub("",opt),action="store")
@@ -185,6 +164,10 @@ if __name__ == '__main__':
     # Use the same log as hodring
     log = getLogger(conf['hodring'],'hodring')
     log.debug("Logger initialised successfully")
+    mrSysDirManager = createMRSystemDirectoryManager(conf, log)
+    if mrSysDirManager is not None:
+      mrSysDirManager.removeMRSystemDirectory()
+
     status =  __archive_logs(conf,log)
     log.info("Archive status : %s" % status)
     list = conf['hodring-cleanup-list'].split(',')

+ 8 - 2
src/contrib/hod/bin/hodring

@@ -46,7 +46,7 @@ sys.path.append(libDirectory)
 
 from hodlib.HodRing.hodRing import HodRing
 from hodlib.Common.setup import *
-from hodlib.Common.util import filter_warnings,get_exception_string, get_exception_error_string
+from hodlib.Common.util import filter_warnings,get_exception_string, get_exception_error_string, getMapredSystemDirectory
 from hodlib.Common.logger import getLogger, ensureLogDir
 
 filter_warnings()
@@ -160,8 +160,11 @@ if __name__ == '__main__':
     list = []
     
     runningHadoops = service.getRunningValues()
-      
+
+    mrSysDirManager = None      
     for cmd in runningHadoops:
+      if cmd.name == 'jobtracker':
+        mrSysDirManager = cmd.getMRSystemDirectoryManager()
       log.debug("addding %s to cleanup list..." % cmd)
       cmd.addCleanup(list)
     
@@ -211,6 +214,9 @@ if __name__ == '__main__':
     if service._cfg.has_key('pkgs'):
       cmdString = cmdString + " --pkgs " + service._cfg['pkgs']
 
+    if mrSysDirManager is not None:
+      cmdString = "%s %s" % (cmdString, mrSysDirManager.toCleanupArgs())
+
     log.info("cleanup commandstring : ")
     log.info(cmdString)
 

+ 4 - 0
src/contrib/hod/hodlib/Common/threads.py

@@ -162,6 +162,10 @@ class simpleCommand(baseThread):
         
         sys.exit(0)
 
+    def getPid(self):
+        """return pid of the launches process"""
+        return self.__pid
+
     def output(self):
         return self.__outputBuffer[:]
 

+ 11 - 1
src/contrib/hod/hodlib/Common/util.py

@@ -13,7 +13,7 @@
 #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #See the License for the specific language governing permissions and
 #limitations under the License.
-import sys, os, traceback, stat, socket, re, warnings, signal
+import errno, sys, os, traceback, stat, socket, re, warnings, signal
 
 from hodlib.Common.tcp import tcpSocket, tcpError 
 from hodlib.Common.threads import simpleCommand
@@ -33,6 +33,16 @@ class AlarmException(Exception):
     def __repr__(self):
         return self.message
 
+def isProcessRunning(pid):
+    '''Check if a process is running, by sending it a 0 signal, and checking for errors'''
+    # This method is documented in some email threads on the python mailing list.
+    # For e.g.: http://mail.python.org/pipermail/python-list/2002-May/144522.html
+    try:
+      os.kill(pid, 0)
+      return True
+    except OSError, err:
+      return err.errno == errno.EPERM
+
 def untar(file, targetDir):
     status = False
     command = 'tar -C %s -zxf %s' % (targetDir, file)

+ 67 - 2
src/contrib/hod/hodlib/HodRing/hodRing.py

@@ -25,7 +25,7 @@ from xml.dom import getDOMImplementation
 from pprint import pformat
 from optparse import OptionParser
 from urlparse import urlparse
-from hodlib.Common.util import local_fqdn, parseEquals, getMapredSystemDirectory
+from hodlib.Common.util import local_fqdn, parseEquals, getMapredSystemDirectory, isProcessRunning
 from hodlib.Common.tcp import tcpSocket, tcpError 
 
 binfile = sys.path[0]
@@ -146,7 +146,67 @@ class CommandDesc:
 
   _parseMap = staticmethod(_parseMap)
 
-      
+class MRSystemDirectoryManager:
+  """Class that is responsible for managing the MapReduce system directory"""
+
+  def __init__(self, jtPid, mrSysDir, fsName, hadoopPath, log, retries=120):
+    self.__jtPid = jtPid
+    self.__mrSysDir = mrSysDir
+    self.__fsName = fsName
+    self.__hadoopPath = hadoopPath
+    self.__log = log
+    self.__retries = retries
+
+  def toCleanupArgs(self):
+    return " --jt-pid %s --mr-sys-dir %s --fs-name %s --hadoop-path %s " \
+              % (self.__jtPid, self.__mrSysDir, self.__fsName, self.__hadoopPath)
+
+  def removeMRSystemDirectory(self):
+    
+    jtActive = isProcessRunning(self.__jtPid)
+    count = 0 # try for a max of a minute for the process to end
+    while jtActive and (count<self.__retries):
+      time.sleep(0.5)
+      jtActive = isProcessRunning(self.__jtPid)
+      count += 1
+    
+    if count == self.__retries:
+      self.__log.warn('Job Tracker did not exit even after a minute. Not going to try and cleanup the system directory')
+      return
+
+    self.__log.debug('jt is now inactive')
+
+    cmd = "%s dfs -fs hdfs://%s -rmr %s" % (self.__hadoopPath, self.__fsName, \
+                                            self.__mrSysDir)
+    self.__log.debug('Command to run to remove system directory: %s' % (cmd))
+    try:
+      hadoopCommand = simpleCommand('mr-sys-dir-cleaner', cmd)
+      hadoopCommand.start()
+      hadoopCommand.wait()
+      hadoopCommand.join()
+      ret = hadoopCommand.exit_code()
+      if ret != 0:
+        self.__log.warn("Error in removing MapReduce system directory '%s' from '%s' using path '%s'" \
+                          % (self.__mrSysDir, self.__fsName, self.__hadoopPath))
+        self.__log.warn(pprint.pformat(hadoopCommand.output()))
+      else:
+        self.__log.info("Removed MapReduce system directory successfully.")
+    except:
+      self.__log.error('Exception while cleaning up MapReduce system directory. May not be cleaned up. %s', \
+                          get_exception_error_string())
+      self.__log.debug(get_exception_string())
+
+
+def createMRSystemDirectoryManager(dict, log):
+  keys = [ 'jt-pid', 'mr-sys-dir', 'fs-name', 'hadoop-path' ]
+  for key in keys:
+    if (not dict.has_key(key)) or (dict[key] is None):
+      return None
+
+  mrSysDirManager = MRSystemDirectoryManager(int(dict['jt-pid']), dict['mr-sys-dir'], \
+                                              dict['fs-name'], dict['hadoop-path'], log)
+  return mrSysDirManager
+
 class HadoopCommand:
   """Runs a single hadoop command"""
     
@@ -296,6 +356,11 @@ class HadoopCommand:
     
     return prop
 
+  def getMRSystemDirectoryManager(self):
+    return MRSystemDirectoryManager(self.__hadoopThread.getPid(), self.__mrSysDir, \
+                                    self.desc.getfinalAttrs()['fs.default.name'], \
+                                    self.path, self.log)
+
   def run(self, dir):
     status = True
     args = []