Browse Source

HADOOP-2924. Fixes an address problem to do with TaskTracker binding to an address. Contributed by Vinod Kumar Vavilapalli.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@637776 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 years ago
parent
commit
099a321e7e

+ 3 - 0
CHANGES.txt

@@ -247,6 +247,9 @@ Trunk (unreleased changes)
     HADOOP-2806. Fixes a streaming document.
     HADOOP-2806. Fixes a streaming document.
     (Amareshwari Sriramadasu via ddas)
     (Amareshwari Sriramadasu via ddas)
 
 
+    HADOOP-2924. Fixes an address problem to do with TaskTracker binding 
+    to an address. (Vinod Kumar Vavilapalli via ddas)
+
 Release 0.16.1 - 2008-03-13
 Release 0.16.1 - 2008-03-13
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 0 - 39
src/contrib/hod/hodlib/GridServices/hdfs.py

@@ -147,43 +147,6 @@ class Hdfs(MasterSlave):
   def _parseEquals(self, list):
   def _parseEquals(self, list):
     return parseEquals(list)
     return parseEquals(list)
   
   
-  def _getNameNodePort(self):
-    sd = self.serviceDesc
-    attrs = sd.getfinalAttrs()
-    if not 'fs.default.name' in attrs:
-      return ServiceUtil.getUniqPort()
-
-    v = attrs['fs.default.name']
-    try:
-      [n, p] = v.split(':', 1)
-      return int(p)
-    except:
-      print get_exception_string()
-      raise ValueError, "Can't find port from attr fs.default.name: %s" % (v)
-
-  def _getNameNodeInfoPort(self):
-    sd = self.serviceDesc
-    attrs = sd.getfinalAttrs()
-    if self.version < 16:
-      if 'dfs.info.bindAddress' not in attrs:
-        return ServiceUtil.getUniqPort()
-    else:
-      if 'dfs.http.address' not in attrs:
-        return ServiceUtil.getUniqPort()
-
-    if self.version < 16:
-      p = attrs['dfs.info.port']
-    else:
-      p = attrs['dfs.http.address'].split(':')[1]
-    try:
-      return int(p)
-    except:
-      print get_exception_string()
-      if self.version < 16:
-        raise ValueError, "Can't find port from attr dfs.info.port: %s" % (p)
-      else:
-        raise ValueError, "Can't find port from attr dfs.http.address: %s" % (p)
-
   def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):
   def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):
     namedir = None
     namedir = None
     datadir = []
     datadir = []
@@ -214,11 +177,9 @@ class Hdfs(MasterSlave):
     workDirs = []
     workDirs = []
     attrs = sd.getfinalAttrs().copy()
     attrs = sd.getfinalAttrs().copy()
     envs = sd.getEnvs().copy()
     envs = sd.getEnvs().copy()
-    #self.masterPort = port = self._getNameNodePort()
     
     
     if 'fs.default.name' not in attrs:
     if 'fs.default.name' not in attrs:
       attrs['fs.default.name'] = 'fillinhostport'
       attrs['fs.default.name'] = 'fillinhostport'
-    #self.infoPort = port = self._getNameNodeInfoPort()
  
  
     if self.version < 16:
     if self.version < 16:
      if 'dfs.info.port' not in attrs:
      if 'dfs.info.port' not in attrs:

+ 0 - 40
src/contrib/hod/hodlib/GridServices/mapred.py

@@ -149,44 +149,6 @@ class MapReduce(MasterSlave):
   def _parseEquals(self, list):
   def _parseEquals(self, list):
     return parseEquals(list)
     return parseEquals(list)
 
 
-  def _getJobTrackerPort(self):
-    sd = self.serviceDesc
-    attrs = sd.getfinalAttrs()
-    if not 'mapred.job.tracker' in attrs:
-      return ServiceUtil.getUniqPort()
-    
-    v = attrs['mapred.job.tracker']
-    try:
-      [n, p] = v.split(':', 1)
-      return int(p)
-    except:
-      print get_exception_string()
-      raise ValueError, "Can't find port from attr mapred.job.tracker: %s" % (v)
-
-  # UNUSED METHOD
-  def _getJobTrackerInfoPort(self):
-    sd = self.serviceDesc
-    attrs = sd.getfinalAttrs()
-    if self.version < 16:
-      if not 'mapred.job.tracker.info.port' in attrs:
-        return ServiceUtil.getUniqPort()
-    else:
-      if 'mapred.job.tracker.http.address' not in attrs:
-        return ServiceUtil.getUniqPort()
-
-    if self.version < 16:
-      p = attrs['mapred.job.tracker.info.port']
-    else:
-      p = attrs['mapred.job.tracker.http.address'].split(':')[1]
-    try:
-      return int(p)
-    except:
-      print get_exception_string()
-      if self.version < 16:
-        raise ValueError, "Can't find port from attr mapred.job.tracker.info.port: %s" % (p)
-      else:
-        raise ValueError, "Can't find port from attr mapred.job.tracker.http.address: %s" % (p)
-
   def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):
   def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):
     local = []
     local = []
     system = None
     system = None
@@ -223,11 +185,9 @@ class MapReduce(MasterSlave):
     attrs = sd.getfinalAttrs().copy()
     attrs = sd.getfinalAttrs().copy()
     envs = sd.getEnvs().copy()
     envs = sd.getEnvs().copy()
 
 
-    #self.masterPort = port = self._getJobTrackerPort()
     if 'mapred.job.tracker' not in attrs:
     if 'mapred.job.tracker' not in attrs:
       attrs['mapred.job.tracker'] = 'fillinhostport'
       attrs['mapred.job.tracker'] = 'fillinhostport'
 
 
-    #self.infoPort = port = self._getJobTrackerInfoPort()
     if self.version < 16:
     if self.version < 16:
       if 'mapred.job.tracker.info.port' not in attrs:
       if 'mapred.job.tracker.info.port' not in attrs:
         attrs['mapred.job.tracker.info.port'] = 'fillinport'
         attrs['mapred.job.tracker.info.port'] = 'fillinport'

+ 30 - 8
src/contrib/hod/hodlib/GridServices/service.py

@@ -174,21 +174,32 @@ class ServiceUtil:
   service.py to a util file"""
   service.py to a util file"""
   localPortUsed = {}
   localPortUsed = {}
     
     
-  def getUniqRandomPort(h=None, low=50000, high=60000, retry = 30):
+  def getUniqRandomPort(h=None, low=50000, high=60000, retry=900, log=None):
     """This allocates a randome free port between low and high"""
     """This allocates a randome free port between low and high"""
+    # We use a default value of 900 retries, which takes an agreeable
+    # time limit of ~ 6.2 seconds to check 900 ports, in the worse case
+    # of no available port in those 900.
+
     while retry > 0:
     while retry > 0:
       n = random.randint(low, high)
       n = random.randint(low, high)
       if n in ServiceUtil.localPortUsed:
       if n in ServiceUtil.localPortUsed:
-        retry -= 1
         continue
         continue
       s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       if not h:
       if not h:
         h = socket.gethostname()
         h = socket.gethostname()
       avail = False
       avail = False
+      if log: log.debug("Trying to see if port %s is available"% n)
       try:
       try:
-        s.connect((h, n))
-      except:
+        s.bind((h, n))
+      except socket.error,e:
+        if log: log.debug("Could not bind to the port %s. Reason %s" % (n,e))
+        retry -= 1
+        pass
+      else:
+        if log: log.debug("Yes, port %s is available" % n)
         avail = True
         avail = True
+      finally:
+        s.close()
 
 
       if avail:
       if avail:
         ServiceUtil.localPortUsed[n] = True
         ServiceUtil.localPortUsed[n] = True
@@ -197,25 +208,36 @@ class ServiceUtil:
   
   
   getUniqRandomPort = staticmethod(getUniqRandomPort)
   getUniqRandomPort = staticmethod(getUniqRandomPort)
   
   
-  def getUniqPort(h=None, low=40000, high=60000, retry = 30):
+  def getUniqPort(h=None, low=40000, high=60000, retry=900, log=None):
     """get unique port on a host that can be used by service
     """get unique port on a host that can be used by service
     This and its consumer code should disappear when master
     This and its consumer code should disappear when master
     nodes get allocatet by nodepool"""
     nodes get allocatet by nodepool"""
 
 
+    # We use a default value of 900 retries, which takes an agreeable
+    # time limit of ~ 6.2 seconds to check 900 ports, in the worse case
+    # of no available port in those 900.
+
     n  = low
     n  = low
     while retry > 0:
     while retry > 0:
       n = n + 1
       n = n + 1
       if n in ServiceUtil.localPortUsed:
       if n in ServiceUtil.localPortUsed:
-        retry -= 1
         continue
         continue
       s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       if not h:
       if not h:
         h = socket.gethostname()
         h = socket.gethostname()
       avail = False
       avail = False
+      if log: log.debug("Trying to see if port %s is available"% n)
       try:
       try:
-        s.connect((h, n))
-      except:
+        s.bind((h, n))
+      except socket.error,e:
+        if log: log.debug("Could not bind to the port %s. Reason %s" % (n,e))
+        retry -= 1
+        pass
+      else:
+        if log: log.debug("Yes, port %s is available" % n)
         avail = True
         avail = True
+      finally:
+        s.close()
 
 
       if avail:
       if avail:
         ServiceUtil.localPortUsed[n] = True
         ServiceUtil.localPortUsed[n] = True

+ 2 - 2
src/contrib/hod/hodlib/HodRing/hodRing.py

@@ -190,7 +190,7 @@ class HadoopCommand:
     for k,v in attr.iteritems():
     for k,v in attr.iteritems():
       self.log.debug('_createHadoopSiteXml: ' + str(k) + " " + str(v))
       self.log.debug('_createHadoopSiteXml: ' + str(k) + " " + str(v))
       if ( v == "fillinport" ):
       if ( v == "fillinport" ):
-        v = "%d" % (ServiceUtil.getUniqRandomPort(low=50000))
+        v = "%d" % (ServiceUtil.getUniqRandomPort(low=50000, log=self.log))
 
 
       keyvalpair = ''
       keyvalpair = ''
       if isinstance(v, (tuple, list)):
       if isinstance(v, (tuple, list)):
@@ -206,7 +206,7 @@ class HadoopCommand:
         self.filledInKeyVals.append(keyvalpair)
         self.filledInKeyVals.append(keyvalpair)
 	
 	
       if ( v == "fillinhostport"):
       if ( v == "fillinhostport"):
-        port = "%d" % (ServiceUtil.getUniqRandomPort(low=50000))
+        port = "%d" % (ServiceUtil.getUniqRandomPort(low=50000, log=self.log))
         self.log.debug('Setting hostname to: %s' % local_fqdn())
         self.log.debug('Setting hostname to: %s' % local_fqdn())
         v = local_fqdn() + ':' + port
         v = local_fqdn() + ':' + port