123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332 |
- #Licensed to the Apache Software Foundation (ASF) under one
- #or more contributor license agreements. See the NOTICE file
- #distributed with this work for additional information
- #regarding copyright ownership. The ASF licenses this file
- #to you under the Apache License, Version 2.0 (the
- #"License"); you may not use this file except in compliance
- #with the License. You may obtain a copy of the License at
- # http://www.apache.org/licenses/LICENSE-2.0
- #Unless required by applicable law or agreed to in writing, software
- #distributed under the License is distributed on an "AS IS" BASIS,
- #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- #See the License for the specific language governing permissions and
- #limitations under the License.
- """define Hdfs as subclass of Service"""
- # -*- python -*-
- import os
- from service import *
- from hodlib.Hod.nodePool import *
- from hodlib.Common.desc import CommandDesc
- from hodlib.Common.util import get_exception_string, parseEquals
- class HdfsExternal(MasterSlave):
- """dummy proxy to external HDFS instance"""
- def __init__(self, serviceDesc, workDirs, version):
- MasterSlave.__init__(self, serviceDesc, workDirs,None)
- self.launchedMaster = True
- self.masterInitialized = True
- self.version = version
-
- def getMasterRequest(self):
- return None
- def getMasterCommands(self, serviceDict):
- return []
- def getAdminCommands(self, serviceDict):
- return []
- def getWorkerCommands(self, serviceDict):
- return []
- def getMasterAddrs(self):
- attrs = self.serviceDesc.getfinalAttrs()
- addr = attrs['fs.default.name']
- return [addr]
-
- def setMasterParams(self, dict):
- self.serviceDesc.dict['final-attrs']['fs.default.name'] = "%s:%s" % \
- (dict['host'], dict['fs_port'])
- if self.version < 16:
- self.serviceDesc.dict['final-attrs']['dfs.info.port'] = \
- str(self.serviceDesc.dict['info_port'])
- else:
- # After Hadoop-2185
- self.serviceDesc.dict['final-attrs']['dfs.http.address'] = "%s:%s" % \
- (dict['host'], dict['info_port'])
- def getInfoAddrs(self):
- attrs = self.serviceDesc.getfinalAttrs()
- if self.version < 16:
- addr = attrs['fs.default.name']
- k,v = addr.split( ":")
- infoaddr = k + ':' + attrs['dfs.info.port']
- else:
- # After Hadoop-2185
- infoaddr = attrs['dfs.http.address']
- return [infoaddr]
- class Hdfs(MasterSlave):
- def __init__(self, serviceDesc, nodePool, required_node, version, \
- format=True, upgrade=False):
- MasterSlave.__init__(self, serviceDesc, nodePool, required_node)
- self.masterNode = None
- self.masterAddr = None
- self.runAdminCommands = True
- self.infoAddr = None
- self._isLost = False
- self.format = format
- self.upgrade = upgrade
- self.workers = []
- self.version = version
- def getMasterRequest(self):
- req = NodeRequest(1, [], False)
- return req
- def getMasterCommands(self, serviceDict):
- masterCommands = []
- if self.format:
- masterCommands.append(self._getNameNodeCommand(True))
- if self.upgrade:
- masterCommands.append(self._getNameNodeCommand(False, True))
- else:
- masterCommands.append(self._getNameNodeCommand(False))
- return masterCommands
- def getAdminCommands(self, serviceDict):
- adminCommands = []
- if self.upgrade and self.runAdminCommands:
- adminCommands.append(self._getNameNodeAdminCommand('-safemode wait'))
- adminCommands.append(self._getNameNodeAdminCommand('-finalizeUpgrade',
- True, True))
- self.runAdminCommands = False
- return adminCommands
- def getWorkerCommands(self, serviceDict):
- cmdDesc = self._getDataNodeCommand()
- return [cmdDesc]
- def setMasterNodes(self, list):
- node = list[0]
- self.masterNode = node
-
- def getMasterAddrs(self):
- return [self.masterAddr]
- def getInfoAddrs(self):
- return [self.infoAddr]
- def getWorkers(self):
- return self.workers
- def setMasterParams(self, list):
- dict = self._parseEquals(list)
- self.masterAddr = dict['fs.default.name']
- k,v = self.masterAddr.split( ":")
- self.masterNode = k
- if self.version < 16:
- self.infoAddr = self.masterNode + ':' + dict['dfs.info.port']
- else:
- # After Hadoop-2185
- self.infoAddr = dict['dfs.http.address']
-
- def _parseEquals(self, list):
- return parseEquals(list)
-
- def _getNameNodePort(self):
- sd = self.serviceDesc
- attrs = sd.getfinalAttrs()
- if not 'fs.default.name' in attrs:
- return ServiceUtil.getUniqPort()
- v = attrs['fs.default.name']
- try:
- [n, p] = v.split(':', 1)
- return int(p)
- except:
- print get_exception_string()
- raise ValueError, "Can't find port from attr fs.default.name: %s" % (v)
- def _getNameNodeInfoPort(self):
- sd = self.serviceDesc
- attrs = sd.getfinalAttrs()
- if self.version < 16:
- if 'dfs.info.bindAddress' not in attrs:
- return ServiceUtil.getUniqPort()
- else:
- if 'dfs.http.address' not in attrs:
- return ServiceUtil.getUniqPort()
- if self.version < 16:
- p = attrs['dfs.info.port']
- else:
- p = attrs['dfs.http.address'].split(':')[1]
- try:
- return int(p)
- except:
- print get_exception_string()
- if self.version < 16:
- raise ValueError, "Can't find port from attr dfs.info.port: %s" % (p)
- else:
- raise ValueError, "Can't find port from attr dfs.http.address: %s" % (p)
- def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):
- namedir = None
- hadooptmpdir = None
- datadir = []
- for p in parentDirs:
- workDirs.append(p)
- workDirs.append(os.path.join(p, subDir))
- dir = os.path.join(p, subDir, 'dfs-data')
- datadir.append(dir)
- if not hadooptmpdir:
- # Not used currently, generating hadooptmpdir just in case
- hadooptmpdir = os.path.join(p, subDir, 'hadoop-tmp')
- if not namedir:
- namedir = os.path.join(p, subDir, 'dfs-name')
- workDirs.append(namedir)
- workDirs.extend(datadir)
- # FIXME!! use csv
- attrs['dfs.name.dir'] = namedir
- attrs['hadoop.tmp.dir'] = hadooptmpdir
- attrs['dfs.data.dir'] = ','.join(datadir)
- # FIXME -- change dfs.client.buffer.dir
- envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA"
- def _getNameNodeCommand(self, format=False, upgrade=False):
- sd = self.serviceDesc
- parentDirs = self.workDirs
- workDirs = []
- attrs = sd.getfinalAttrs().copy()
- envs = sd.getEnvs().copy()
- #self.masterPort = port = self._getNameNodePort()
-
- if 'fs.default.name' not in attrs:
- attrs['fs.default.name'] = 'fillinhostport'
- #self.infoPort = port = self._getNameNodeInfoPort()
-
- if self.version < 16:
- if 'dfs.info.port' not in attrs:
- attrs['dfs.info.port'] = 'fillinport'
- else:
- # Addressing Hadoop-2185, added the following. Earlier versions don't
- # care about this
- if 'dfs.http.address' not in attrs:
- attrs['dfs.http.address'] = 'fillinhostport'
- self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn')
- dict = { 'name' : 'namenode' }
- dict['program'] = os.path.join('bin', 'hadoop')
- argv = ['namenode']
- if format:
- argv.append('-format')
- elif upgrade:
- argv.append('-upgrade')
- dict['argv'] = argv
- dict['envs'] = envs
- dict['pkgdirs'] = sd.getPkgDirs()
- dict['workdirs'] = workDirs
- dict['final-attrs'] = attrs
- dict['attrs'] = sd.getAttrs()
- if format:
- dict['fg'] = 'true'
- dict['stdin'] = 'Y'
- cmd = CommandDesc(dict)
- return cmd
- def _getNameNodeAdminCommand(self, adminCommand, wait=True, ignoreFailures=False):
- sd = self.serviceDesc
- parentDirs = self.workDirs
- workDirs = []
- attrs = sd.getfinalAttrs().copy()
- envs = sd.getEnvs().copy()
- nn = self.masterAddr
- if nn == None:
- raise ValueError, "Can't get namenode address"
- attrs['fs.default.name'] = nn
- self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn')
- dict = { 'name' : 'dfsadmin' }
- dict['program'] = os.path.join('bin', 'hadoop')
- argv = ['dfsadmin']
- argv.append(adminCommand)
- dict['argv'] = argv
- dict['envs'] = envs
- dict['pkgdirs'] = sd.getPkgDirs()
- dict['workdirs'] = workDirs
- dict['final-attrs'] = attrs
- dict['attrs'] = sd.getAttrs()
- if wait:
- dict['fg'] = 'true'
- dict['stdin'] = 'Y'
- if ignoreFailures:
- dict['ignorefailures'] = 'Y'
- cmd = CommandDesc(dict)
- return cmd
-
- def _getDataNodeCommand(self):
- sd = self.serviceDesc
- parentDirs = self.workDirs
- workDirs = []
- attrs = sd.getfinalAttrs().copy()
- envs = sd.getEnvs().copy()
- nn = self.masterAddr
- if nn == None:
- raise ValueError, "Can't get namenode address"
- attrs['fs.default.name'] = nn
- if self.version < 16:
- if 'dfs.datanode.port' not in attrs:
- attrs['dfs.datanode.port'] = 'fillinport'
- if 'dfs.datanode.info.port' not in attrs:
- attrs['dfs.datanode.info.port'] = 'fillinport'
- else:
- # Adding the following. Hadoop-2185
- if 'dfs.datanode.address' not in attrs:
- attrs['dfs.datanode.address'] = 'fillinhostport'
- if 'dfs.datanode.http.address' not in attrs:
- attrs['dfs.datanode.http.address'] = 'fillinhostport'
-
- self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-dn')
- dict = { 'name' : 'datanode' }
- dict['program'] = os.path.join('bin', 'hadoop')
- dict['argv'] = ['datanode']
- dict['envs'] = envs
- dict['pkgdirs'] = sd.getPkgDirs()
- dict['workdirs'] = workDirs
- dict['final-attrs'] = attrs
- dict['attrs'] = sd.getAttrs()
- cmd = CommandDesc(dict)
- return cmd
|