mapred.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements. See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership. The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License. You may obtain a copy of the License at
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. """define MapReduce as subclass of Service"""
  15. # -*- python -*-
  16. import os, copy, time
  17. from service import *
  18. from hodlib.Hod.nodePool import *
  19. from hodlib.Common.desc import CommandDesc
  20. from hodlib.Common.util import get_exception_string, parseEquals
  21. class MapReduceExternal(MasterSlave):
  22. """dummy proxy to external MapReduce instance"""
  23. def __init__(self, serviceDesc, workDirs, version):
  24. MasterSlave.__init__(self, serviceDesc, workDirs,None)
  25. self.launchedMaster = True
  26. self.masterInitialized = True
  27. self.version = version
  28. def getMasterRequest(self):
  29. return None
  30. def getMasterCommands(self, serviceDict):
  31. return []
  32. def getAdminCommands(self, serviceDict):
  33. return []
  34. def getWorkerCommands(self, serviceDict):
  35. return []
  36. def getMasterAddrs(self):
  37. attrs = self.serviceDesc.getfinalAttrs()
  38. addr = attrs['mapred.job.tracker']
  39. return [addr]
  40. def needsMore(self):
  41. return 0
  42. def needsLess(self):
  43. return 0
  44. def setMasterParams(self, dict):
  45. self.serviceDesc['final-attrs']['mapred.job.tracker'] = "%s:%s" % (dict['host'],
  46. dict['tracker_port'])
  47. if self.version < 16:
  48. self.serviceDesc.dict['final-attrs']['mapred.job.tracker.info.port'] = \
  49. str(self.serviceDesc.dict['info_port'])
  50. else:
  51. # After Hadoop-2185
  52. self.serviceDesc['final-attrs']['mapred.job.tracker.http.address'] = \
  53. "%s:%s" %(dict['host'], dict['info_port'])
  54. def getInfoAddrs(self):
  55. attrs = self.serviceDesc.getfinalAttrs()
  56. if self.version < 16:
  57. addr = attrs['mapred.job.tracker']
  58. k,v = addr.split( ":")
  59. infoaddr = k + ':' + attrs['mapred.job.tracker.info.port']
  60. else:
  61. # After Hadoop-2185
  62. # Note: earlier,we never respected mapred.job.tracker.http.address
  63. infoaddr = attrs['mapred.job.tracker.http.address']
  64. return [infoaddr]
  65. class MapReduce(MasterSlave):
  66. def __init__(self, serviceDesc, workDirs,required_node, version,
  67. workers_per_ring = 1):
  68. MasterSlave.__init__(self, serviceDesc, workDirs,required_node)
  69. self.masterNode = None
  70. self.masterAddr = None
  71. self.infoAddr = None
  72. self.workers = []
  73. self.required_node = required_node
  74. self.version = version
  75. self.workers_per_ring = workers_per_ring
  76. def isLaunchable(self, serviceDict):
  77. hdfs = serviceDict['hdfs']
  78. if (hdfs.isMasterInitialized()):
  79. return True
  80. return False
  81. def getMasterRequest(self):
  82. req = NodeRequest(1, [], False)
  83. return req
  84. def getMasterCommands(self, serviceDict):
  85. hdfs = serviceDict['hdfs']
  86. cmdDesc = self._getJobTrackerCommand(hdfs)
  87. return [cmdDesc]
  88. def getAdminCommands(self, serviceDict):
  89. return []
  90. def getWorkerCommands(self, serviceDict):
  91. hdfs = serviceDict['hdfs']
  92. workerCmds = []
  93. for id in range(1, self.workers_per_ring + 1):
  94. workerCmds.append(self._getTaskTrackerCommand(str(id), hdfs))
  95. return workerCmds
  96. def setMasterNodes(self, list):
  97. node = list[0]
  98. self.masterNode = node
  99. def getMasterAddrs(self):
  100. return [self.masterAddr]
  101. def getInfoAddrs(self):
  102. return [self.infoAddr]
  103. def getWorkers(self):
  104. return self.workers
  105. def requiredNode(self):
  106. return self.required_host
  107. def setMasterParams(self, list):
  108. dict = self._parseEquals(list)
  109. self.masterAddr = dict['mapred.job.tracker']
  110. k,v = self.masterAddr.split(":")
  111. self.masterNode = k
  112. if self.version < 16:
  113. self.infoAddr = self.masterNode + ':' + dict['mapred.job.tracker.info.port']
  114. else:
  115. # After Hadoop-2185
  116. self.infoAddr = dict['mapred.job.tracker.http.address']
  117. def _parseEquals(self, list):
  118. return parseEquals(list)
  119. def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):
  120. local = []
  121. system = None
  122. temp = None
  123. hadooptmpdir = None
  124. dfsclient = []
  125. for p in parentDirs:
  126. workDirs.append(p)
  127. workDirs.append(os.path.join(p, subDir))
  128. dir = os.path.join(p, subDir, 'mapred-local')
  129. local.append(dir)
  130. if not system:
  131. system = os.path.join(p, subDir, 'mapred-system')
  132. if not temp:
  133. temp = os.path.join(p, subDir, 'mapred-temp')
  134. if not hadooptmpdir:
  135. # Not used currently, generating hadooptmpdir just in case
  136. hadooptmpdir = os.path.join(p, subDir, 'hadoop-tmp')
  137. dfsclientdir = os.path.join(p, subDir, 'dfs-client')
  138. dfsclient.append(dfsclientdir)
  139. workDirs.append(dfsclientdir)
  140. # FIXME!! use csv
  141. attrs['mapred.local.dir'] = ','.join(local)
  142. attrs['mapred.system.dir'] = 'fillindir'
  143. attrs['mapred.temp.dir'] = temp
  144. attrs['dfs.client.buffer.dir'] = ','.join(dfsclient)
  145. attrs['hadoop.tmp.dir'] = hadooptmpdir
  146. envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA"
  147. def _getJobTrackerCommand(self, hdfs):
  148. sd = self.serviceDesc
  149. parentDirs = self.workDirs
  150. workDirs = []
  151. attrs = sd.getfinalAttrs().copy()
  152. envs = sd.getEnvs().copy()
  153. if 'mapred.job.tracker' not in attrs:
  154. attrs['mapred.job.tracker'] = 'fillinhostport'
  155. if self.version < 16:
  156. if 'mapred.job.tracker.info.port' not in attrs:
  157. attrs['mapred.job.tracker.info.port'] = 'fillinport'
  158. else:
  159. # Addressing Hadoop-2185,
  160. if 'mapred.job.tracker.http.address' not in attrs:
  161. attrs['mapred.job.tracker.http.address'] = 'fillinhostport'
  162. attrs['fs.default.name'] = hdfs.getMasterAddrs()[0]
  163. self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'mapred-jt')
  164. dict = { 'name' : 'jobtracker' }
  165. dict['version'] = self.version
  166. dict['program'] = os.path.join('bin', 'hadoop')
  167. dict['argv'] = ['jobtracker']
  168. dict['envs'] = envs
  169. dict['pkgdirs'] = sd.getPkgDirs()
  170. dict['workdirs'] = workDirs
  171. dict['final-attrs'] = attrs
  172. dict['attrs'] = sd.getAttrs()
  173. cmd = CommandDesc(dict)
  174. return cmd
  175. def _getTaskTrackerCommand(self, id, hdfs):
  176. sd = self.serviceDesc
  177. parentDirs = self.workDirs
  178. workDirs = []
  179. attrs = sd.getfinalAttrs().copy()
  180. envs = sd.getEnvs().copy()
  181. jt = self.masterAddr
  182. if jt == None:
  183. raise ValueError, "Can't get job tracker address"
  184. attrs['mapred.job.tracker'] = jt
  185. attrs['fs.default.name'] = hdfs.getMasterAddrs()[0]
  186. if self.version < 16:
  187. if 'tasktracker.http.port' not in attrs:
  188. attrs['tasktracker.http.port'] = 'fillinport'
  189. # earlier to 16, tasktrackers always took ephemeral port 0 for
  190. # tasktracker.report.bindAddress
  191. else:
  192. # Adding the following. Hadoop-2185
  193. if 'mapred.task.tracker.report.address' not in attrs:
  194. attrs['mapred.task.tracker.report.address'] = 'fillinhostport'
  195. if 'mapred.task.tracker.http.address' not in attrs:
  196. attrs['mapred.task.tracker.http.address'] = 'fillinhostport'
  197. # unique parentDirs in case of multiple tasktrackers per hodring
  198. pd = []
  199. for dir in parentDirs:
  200. dir = dir + "-" + id
  201. pd.append(dir)
  202. parentDirs = pd
  203. # end of unique workdirs
  204. self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'mapred-tt')
  205. dict = { 'name' : 'tasktracker' }
  206. dict['program'] = os.path.join('bin', 'hadoop')
  207. dict['argv'] = ['tasktracker']
  208. dict['envs'] = envs
  209. dict['pkgdirs'] = sd.getPkgDirs()
  210. dict['workdirs'] = workDirs
  211. dict['final-attrs'] = attrs
  212. dict['attrs'] = sd.getAttrs()
  213. cmd = CommandDesc(dict)
  214. return cmd