hdfs.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements. See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership. The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License. You may obtain a copy of the License at
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. """define Hdfs as subclass of Service"""
  15. # -*- python -*-
  16. import os
  17. from service import *
  18. from hodlib.Hod.nodePool import *
  19. from hodlib.Common.desc import CommandDesc
  20. from hodlib.Common.util import get_exception_string, parseEquals
  21. class HdfsExternal(MasterSlave):
  22. """dummy proxy to external HDFS instance"""
  23. def __init__(self, serviceDesc, workDirs, version):
  24. MasterSlave.__init__(self, serviceDesc, workDirs,None)
  25. self.launchedMaster = True
  26. self.masterInitialized = True
  27. self.version = version
  28. def getMasterRequest(self):
  29. return None
  30. def getMasterCommands(self, serviceDict):
  31. return []
  32. def getAdminCommands(self, serviceDict):
  33. return []
  34. def getWorkerCommands(self, serviceDict):
  35. return []
  36. def getMasterAddrs(self):
  37. attrs = self.serviceDesc.getfinalAttrs()
  38. addr = attrs['fs.default.name']
  39. return [addr]
  40. def setMasterParams(self, dict):
  41. self.serviceDesc.dict['final-attrs']['fs.default.name'] = "%s:%s" % \
  42. (dict['host'], dict['fs_port'])
  43. if self.version < 16:
  44. self.serviceDesc.dict['final-attrs']['dfs.info.port'] = \
  45. str(self.serviceDesc.dict['info_port'])
  46. else:
  47. # After Hadoop-2185
  48. self.serviceDesc.dict['final-attrs']['dfs.http.address'] = "%s:%s" % \
  49. (dict['host'], dict['info_port'])
  50. def getInfoAddrs(self):
  51. attrs = self.serviceDesc.getfinalAttrs()
  52. if self.version < 16:
  53. addr = attrs['fs.default.name']
  54. k,v = addr.split( ":")
  55. infoaddr = k + ':' + attrs['dfs.info.port']
  56. else:
  57. # After Hadoop-2185
  58. infoaddr = attrs['dfs.http.address']
  59. return [infoaddr]
  60. class Hdfs(MasterSlave):
  61. def __init__(self, serviceDesc, nodePool, required_node, version, \
  62. format=True, upgrade=False):
  63. MasterSlave.__init__(self, serviceDesc, nodePool, required_node)
  64. self.masterNode = None
  65. self.masterAddr = None
  66. self.runAdminCommands = True
  67. self.infoAddr = None
  68. self._isLost = False
  69. self.format = format
  70. self.upgrade = upgrade
  71. self.workers = []
  72. self.version = version
  73. def getMasterRequest(self):
  74. req = NodeRequest(1, [], False)
  75. return req
  76. def getMasterCommands(self, serviceDict):
  77. masterCommands = []
  78. if self.format:
  79. masterCommands.append(self._getNameNodeCommand(True))
  80. if self.upgrade:
  81. masterCommands.append(self._getNameNodeCommand(False, True))
  82. else:
  83. masterCommands.append(self._getNameNodeCommand(False))
  84. return masterCommands
  85. def getAdminCommands(self, serviceDict):
  86. adminCommands = []
  87. if self.upgrade and self.runAdminCommands:
  88. adminCommands.append(self._getNameNodeAdminCommand('-safemode wait'))
  89. adminCommands.append(self._getNameNodeAdminCommand('-finalizeUpgrade',
  90. True, True))
  91. self.runAdminCommands = False
  92. return adminCommands
  93. def getWorkerCommands(self, serviceDict):
  94. cmdDesc = self._getDataNodeCommand()
  95. return [cmdDesc]
  96. def setMasterNodes(self, list):
  97. node = list[0]
  98. self.masterNode = node
  99. def getMasterAddrs(self):
  100. return [self.masterAddr]
  101. def getInfoAddrs(self):
  102. return [self.infoAddr]
  103. def getWorkers(self):
  104. return self.workers
  105. def setMasterParams(self, list):
  106. dict = self._parseEquals(list)
  107. self.masterAddr = dict['fs.default.name']
  108. k,v = self.masterAddr.split( ":")
  109. self.masterNode = k
  110. if self.version < 16:
  111. self.infoAddr = self.masterNode + ':' + dict['dfs.info.port']
  112. else:
  113. # After Hadoop-2185
  114. self.infoAddr = dict['dfs.http.address']
  115. def _parseEquals(self, list):
  116. return parseEquals(list)
  117. def _getNameNodePort(self):
  118. sd = self.serviceDesc
  119. attrs = sd.getfinalAttrs()
  120. if not 'fs.default.name' in attrs:
  121. return ServiceUtil.getUniqPort()
  122. v = attrs['fs.default.name']
  123. try:
  124. [n, p] = v.split(':', 1)
  125. return int(p)
  126. except:
  127. print get_exception_string()
  128. raise ValueError, "Can't find port from attr fs.default.name: %s" % (v)
  129. def _getNameNodeInfoPort(self):
  130. sd = self.serviceDesc
  131. attrs = sd.getfinalAttrs()
  132. if self.version < 16:
  133. if 'dfs.info.bindAddress' not in attrs:
  134. return ServiceUtil.getUniqPort()
  135. else:
  136. if 'dfs.http.address' not in attrs:
  137. return ServiceUtil.getUniqPort()
  138. if self.version < 16:
  139. p = attrs['dfs.info.port']
  140. else:
  141. p = attrs['dfs.http.address'].split(':')[1]
  142. try:
  143. return int(p)
  144. except:
  145. print get_exception_string()
  146. if self.version < 16:
  147. raise ValueError, "Can't find port from attr dfs.info.port: %s" % (p)
  148. else:
  149. raise ValueError, "Can't find port from attr dfs.http.address: %s" % (p)
  150. def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):
  151. namedir = None
  152. hadooptmpdir = None
  153. datadir = []
  154. for p in parentDirs:
  155. workDirs.append(p)
  156. workDirs.append(os.path.join(p, subDir))
  157. dir = os.path.join(p, subDir, 'dfs-data')
  158. datadir.append(dir)
  159. if not hadooptmpdir:
  160. # Not used currently, generating hadooptmpdir just in case
  161. hadooptmpdir = os.path.join(p, subDir, 'hadoop-tmp')
  162. if not namedir:
  163. namedir = os.path.join(p, subDir, 'dfs-name')
  164. workDirs.append(namedir)
  165. workDirs.extend(datadir)
  166. # FIXME!! use csv
  167. attrs['dfs.name.dir'] = namedir
  168. attrs['hadoop.tmp.dir'] = hadooptmpdir
  169. attrs['dfs.data.dir'] = ','.join(datadir)
  170. # FIXME -- change dfs.client.buffer.dir
  171. envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA"
  172. def _getNameNodeCommand(self, format=False, upgrade=False):
  173. sd = self.serviceDesc
  174. parentDirs = self.workDirs
  175. workDirs = []
  176. attrs = sd.getfinalAttrs().copy()
  177. envs = sd.getEnvs().copy()
  178. #self.masterPort = port = self._getNameNodePort()
  179. if 'fs.default.name' not in attrs:
  180. attrs['fs.default.name'] = 'fillinhostport'
  181. #self.infoPort = port = self._getNameNodeInfoPort()
  182. if self.version < 16:
  183. if 'dfs.info.port' not in attrs:
  184. attrs['dfs.info.port'] = 'fillinport'
  185. else:
  186. # Addressing Hadoop-2185, added the following. Earlier versions don't
  187. # care about this
  188. if 'dfs.http.address' not in attrs:
  189. attrs['dfs.http.address'] = 'fillinhostport'
  190. self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn')
  191. dict = { 'name' : 'namenode' }
  192. dict['program'] = os.path.join('bin', 'hadoop')
  193. argv = ['namenode']
  194. if format:
  195. argv.append('-format')
  196. elif upgrade:
  197. argv.append('-upgrade')
  198. dict['argv'] = argv
  199. dict['envs'] = envs
  200. dict['pkgdirs'] = sd.getPkgDirs()
  201. dict['workdirs'] = workDirs
  202. dict['final-attrs'] = attrs
  203. dict['attrs'] = sd.getAttrs()
  204. if format:
  205. dict['fg'] = 'true'
  206. dict['stdin'] = 'Y'
  207. cmd = CommandDesc(dict)
  208. return cmd
  209. def _getNameNodeAdminCommand(self, adminCommand, wait=True, ignoreFailures=False):
  210. sd = self.serviceDesc
  211. parentDirs = self.workDirs
  212. workDirs = []
  213. attrs = sd.getfinalAttrs().copy()
  214. envs = sd.getEnvs().copy()
  215. nn = self.masterAddr
  216. if nn == None:
  217. raise ValueError, "Can't get namenode address"
  218. attrs['fs.default.name'] = nn
  219. self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn')
  220. dict = { 'name' : 'dfsadmin' }
  221. dict['program'] = os.path.join('bin', 'hadoop')
  222. argv = ['dfsadmin']
  223. argv.append(adminCommand)
  224. dict['argv'] = argv
  225. dict['envs'] = envs
  226. dict['pkgdirs'] = sd.getPkgDirs()
  227. dict['workdirs'] = workDirs
  228. dict['final-attrs'] = attrs
  229. dict['attrs'] = sd.getAttrs()
  230. if wait:
  231. dict['fg'] = 'true'
  232. dict['stdin'] = 'Y'
  233. if ignoreFailures:
  234. dict['ignorefailures'] = 'Y'
  235. cmd = CommandDesc(dict)
  236. return cmd
  237. def _getDataNodeCommand(self):
  238. sd = self.serviceDesc
  239. parentDirs = self.workDirs
  240. workDirs = []
  241. attrs = sd.getfinalAttrs().copy()
  242. envs = sd.getEnvs().copy()
  243. nn = self.masterAddr
  244. if nn == None:
  245. raise ValueError, "Can't get namenode address"
  246. attrs['fs.default.name'] = nn
  247. if self.version < 16:
  248. if 'dfs.datanode.port' not in attrs:
  249. attrs['dfs.datanode.port'] = 'fillinport'
  250. if 'dfs.datanode.info.port' not in attrs:
  251. attrs['dfs.datanode.info.port'] = 'fillinport'
  252. else:
  253. # Adding the following. Hadoop-2185
  254. if 'dfs.datanode.address' not in attrs:
  255. attrs['dfs.datanode.address'] = 'fillinhostport'
  256. if 'dfs.datanode.http.address' not in attrs:
  257. attrs['dfs.datanode.http.address'] = 'fillinhostport'
  258. self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-dn')
  259. dict = { 'name' : 'datanode' }
  260. dict['program'] = os.path.join('bin', 'hadoop')
  261. dict['argv'] = ['datanode']
  262. dict['envs'] = envs
  263. dict['pkgdirs'] = sd.getPkgDirs()
  264. dict['workdirs'] = workDirs
  265. dict['final-attrs'] = attrs
  266. dict['attrs'] = sd.getAttrs()
  267. cmd = CommandDesc(dict)
  268. return cmd