ソースを参照

HADOOP-3695. Provide an ability to start multiple workers per node. Contributed by Vinod Kumar Vavilapalli

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@675194 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala 17 年 前
コミット
1545eeaec5

+ 33 - 11
docs/changes.html

@@ -95,7 +95,7 @@ naming convention,such as, hadoop.rm.queue.queue-name.property-name.<br />(Heman
     </ol>
     </ol>
   </li>
   </li>
   <li><a href="javascript:toggleList('trunk_(unreleased_changes)_._improvements_')">  IMPROVEMENTS
   <li><a href="javascript:toggleList('trunk_(unreleased_changes)_._improvements_')">  IMPROVEMENTS
-</a>&nbsp;&nbsp;&nbsp;(6)
+</a>&nbsp;&nbsp;&nbsp;(7)
     <ol id="trunk_(unreleased_changes)_._improvements_">
     <ol id="trunk_(unreleased_changes)_._improvements_">
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3577">HADOOP-3577</a>. Tools to inject blocks into name node and simulated
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3577">HADOOP-3577</a>. Tools to inject blocks into name node and simulated
 data nodes for testing.<br />(Sanjay Radia via hairong)</li>
 data nodes for testing.<br />(Sanjay Radia via hairong)</li>
@@ -106,6 +106,8 @@ Loughran via omalley)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3543">HADOOP-3543</a>. Update the copyright year to 2008.<br />(cdouglas via omalley)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3543">HADOOP-3543</a>. Update the copyright year to 2008.<br />(cdouglas via omalley)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3587">HADOOP-3587</a>. Add a unit test for the contrib/data_join framework.<br />(cdouglas)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3587">HADOOP-3587</a>. Add a unit test for the contrib/data_join framework.<br />(cdouglas)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3402">HADOOP-3402</a>. Add terasort example program<br />(omalley)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3402">HADOOP-3402</a>. Add terasort example program<br />(omalley)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-3660">HADOOP-3660</a>. Add replication factor for injecting blocks in simulated
+datanodes.<br />(Sanjay Radia via cdouglas)</li>
     </ol>
     </ol>
   </li>
   </li>
   <li><a href="javascript:toggleList('trunk_(unreleased_changes)_._optimizations_')">  OPTIMIZATIONS
   <li><a href="javascript:toggleList('trunk_(unreleased_changes)_._optimizations_')">  OPTIMIZATIONS
@@ -130,7 +132,7 @@ omalley)</li>
 </a></h2>
 </a></h2>
 <ul id="release_0.18.0_-_unreleased_">
 <ul id="release_0.18.0_-_unreleased_">
   <li><a href="javascript:toggleList('release_0.18.0_-_unreleased_._incompatible_changes_')">  INCOMPATIBLE CHANGES
   <li><a href="javascript:toggleList('release_0.18.0_-_unreleased_._incompatible_changes_')">  INCOMPATIBLE CHANGES
-</a>&nbsp;&nbsp;&nbsp;(22)
+</a>&nbsp;&nbsp;&nbsp;(23)
     <ol id="release_0.18.0_-_unreleased_._incompatible_changes_">
     <ol id="release_0.18.0_-_unreleased_._incompatible_changes_">
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-2703">HADOOP-2703</a>.  The default options to fsck skips checking files
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-2703">HADOOP-2703</a>.  The default options to fsck skips checking files
 that are being written to. The output of fsck is incompatible
 that are being written to. The output of fsck is incompatible
@@ -202,6 +204,9 @@ of the keytype if the type does not define a WritableComparator. Calling
 the superclass compare will throw a NullPointerException. Also define
 the superclass compare will throw a NullPointerException. Also define
 a RawComparator for NullWritable and permit it to be written as a key
 a RawComparator for NullWritable and permit it to be written as a key
 to SequenceFiles.<br />(cdouglas)</li>
 to SequenceFiles.<br />(cdouglas)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-3673">HADOOP-3673</a>. Avoid deadlock caused by DataNode RPC receoverBlock().
+(Tsz Wo (Nicholas), SZE via rangadi)
+</li>
     </ol>
     </ol>
   </li>
   </li>
   <li><a href="javascript:toggleList('release_0.18.0_-_unreleased_._new_features_')">  NEW FEATURES
   <li><a href="javascript:toggleList('release_0.18.0_-_unreleased_._new_features_')">  NEW FEATURES
@@ -378,7 +383,7 @@ InputFormat.validateInput.<br />(tomwhite via omalley)</li>
     </ol>
     </ol>
   </li>
   </li>
   <li><a href="javascript:toggleList('release_0.18.0_-_unreleased_._bug_fixes_')">  BUG FIXES
   <li><a href="javascript:toggleList('release_0.18.0_-_unreleased_._bug_fixes_')">  BUG FIXES
-</a>&nbsp;&nbsp;&nbsp;(115)
+</a>&nbsp;&nbsp;&nbsp;(117)
     <ol id="release_0.18.0_-_unreleased_._bug_fixes_">
     <ol id="release_0.18.0_-_unreleased_._bug_fixes_">
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-2905">HADOOP-2905</a>. 'fsck -move' triggers NPE in NameNode.<br />(Lohit Vjayarenu via rangadi)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-2905">HADOOP-2905</a>. 'fsck -move' triggers NPE in NameNode.<br />(Lohit Vjayarenu via rangadi)</li>
       <li>Increment ClientProtocol.versionID missed by <a href="http://issues.apache.org/jira/browse/HADOOP-2585">HADOOP-2585</a>.<br />(shv)</li>
       <li>Increment ClientProtocol.versionID missed by <a href="http://issues.apache.org/jira/browse/HADOOP-2585">HADOOP-2585</a>.<br />(shv)</li>
@@ -605,33 +610,50 @@ conform to style guidelines.<br />(Amareshwari Sriramadasu via cdouglas)</li>
 classpath jars.<br />(Brice Arnould via nigel)</li>
 classpath jars.<br />(Brice Arnould via nigel)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3692">HADOOP-3692</a>. Fix documentation for Cluster setup and Quick start guides.<br />(Amareshwari Sriramadasu via ddas)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3692">HADOOP-3692</a>. Fix documentation for Cluster setup and Quick start guides.<br />(Amareshwari Sriramadasu via ddas)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3691">HADOOP-3691</a>. Fix streaming and tutorial docs.<br />(Jothi Padmanabhan via ddas)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3691">HADOOP-3691</a>. Fix streaming and tutorial docs.<br />(Jothi Padmanabhan via ddas)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-3630">HADOOP-3630</a>. Fix NullPointerException in CompositeRecordReader from empty
+sources<br />(cdouglas)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-3706">HADOOP-3706</a>. Fix a ClassLoader issue in the mapred.join Parser that
+prevents it from loading user-specified InputFormats.<br />(Jingkei Ly via cdouglas)</li>
     </ol>
     </ol>
   </li>
   </li>
 </ul>
 </ul>
 <h2><a href="javascript:toggleList('older')">Older Releases</a></h2>
 <h2><a href="javascript:toggleList('older')">Older Releases</a></h2>
 <ul id="older">
 <ul id="older">
-<h3><a href="javascript:toggleList('release_0.17.1_-_unreleased_')">Release 0.17.1 - Unreleased
+<h3><a href="javascript:toggleList('release_0.17.2_-_unreleased_')">Release 0.17.2 - Unreleased
+</a></h3>
+<ul id="release_0.17.2_-_unreleased_">
+  <li><a href="javascript:toggleList('release_0.17.2_-_unreleased_._bug_fixes_')">  BUG FIXES
+</a>&nbsp;&nbsp;&nbsp;(3)
+    <ol id="release_0.17.2_-_unreleased_._bug_fixes_">
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-3681">HADOOP-3681</a>. DFSClient can get into an infinite loop while closing
+a file if there are some errors.<br />(Lohit Vijayarenu via rangadi)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-3002">HADOOP-3002</a>. Hold off block removal while in safe mode.<br />(shv)</li>
+      <li><a href="http://issues.apache.org/jira/browse/HADOOP-3685">HADOOP-3685</a>. Unbalanced replication target.<br />(hairong)</li>
+    </ol>
+  </li>
+</ul>
+<h3><a href="javascript:toggleList('release_0.17.1_-_2008-06-23_')">Release 0.17.1 - 2008-06-23
 </a></h3>
 </a></h3>
-<ul id="release_0.17.1_-_unreleased_">
-  <li><a href="javascript:toggleList('release_0.17.1_-_unreleased_._incompatible_changes_')">  INCOMPATIBLE CHANGES
+<ul id="release_0.17.1_-_2008-06-23_">
+  <li><a href="javascript:toggleList('release_0.17.1_-_2008-06-23_._incompatible_changes_')">  INCOMPATIBLE CHANGES
 </a>&nbsp;&nbsp;&nbsp;(1)
 </a>&nbsp;&nbsp;&nbsp;(1)
-    <ol id="release_0.17.1_-_unreleased_._incompatible_changes_">
+    <ol id="release_0.17.1_-_2008-06-23_._incompatible_changes_">
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3565">HADOOP-3565</a>. Fix the Java serialization, which is not enabled by
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3565">HADOOP-3565</a>. Fix the Java serialization, which is not enabled by
 default, to clear the state of the serializer between objects.<br />(tomwhite via omalley)</li>
 default, to clear the state of the serializer between objects.<br />(tomwhite via omalley)</li>
     </ol>
     </ol>
   </li>
   </li>
-  <li><a href="javascript:toggleList('release_0.17.1_-_unreleased_._improvements_')">  IMPROVEMENTS
+  <li><a href="javascript:toggleList('release_0.17.1_-_2008-06-23_._improvements_')">  IMPROVEMENTS
 </a>&nbsp;&nbsp;&nbsp;(2)
 </a>&nbsp;&nbsp;&nbsp;(2)
-    <ol id="release_0.17.1_-_unreleased_._improvements_">
+    <ol id="release_0.17.1_-_2008-06-23_._improvements_">
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3522">HADOOP-3522</a>. Improve documentation on reduce pointing out that
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3522">HADOOP-3522</a>. Improve documentation on reduce pointing out that
 input keys and values will be reused.<br />(omalley)</li>
 input keys and values will be reused.<br />(omalley)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3487">HADOOP-3487</a>. Balancer uses thread pools for managing its threads;
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3487">HADOOP-3487</a>. Balancer uses thread pools for managing its threads;
 therefore provides better resource management.<br />(hairong)</li>
 therefore provides better resource management.<br />(hairong)</li>
     </ol>
     </ol>
   </li>
   </li>
-  <li><a href="javascript:toggleList('release_0.17.1_-_unreleased_._bug_fixes_')">  BUG FIXES
+  <li><a href="javascript:toggleList('release_0.17.1_-_2008-06-23_._bug_fixes_')">  BUG FIXES
 </a>&nbsp;&nbsp;&nbsp;(14)
 </a>&nbsp;&nbsp;&nbsp;(14)
-    <ol id="release_0.17.1_-_unreleased_._bug_fixes_">
+    <ol id="release_0.17.1_-_2008-06-23_._bug_fixes_">
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-2159">HADOOP-2159</a> Namenode stuck in safemode. The counter blockSafe should
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-2159">HADOOP-2159</a> Namenode stuck in safemode. The counter blockSafe should
 not be decremented for invalid blocks.<br />(hairong)</li>
 not be decremented for invalid blocks.<br />(hairong)</li>
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3472">HADOOP-3472</a> MapFile.Reader getClosest() function returns incorrect results
       <li><a href="http://issues.apache.org/jira/browse/HADOOP-3472">HADOOP-3472</a> MapFile.Reader getClosest() function returns incorrect results

+ 19 - 4
docs/hod_config_guide.html

@@ -384,7 +384,8 @@ document.write("Last Published: " + document.lastModified);
           
           
 <li>work-dirs: Comma-separated list of paths that will serve
 <li>work-dirs: Comma-separated list of paths that will serve
                        as the root for directories that HOD generates and passes
                        as the root for directories that HOD generates and passes
-                       to Hadoop for use to store DFS and Map/Reduce data. For e.g.
+                       to Hadoop for use to store DFS and Map/Reduce data. For
+                       example,
                        this is where DFS data blocks will be stored. Typically,
                        this is where DFS data blocks will be stored. Typically,
                        as many paths are specified as there are disks available
                        as many paths are specified as there are disks available
                        to ensure all disks are being utilized. The restrictions
                        to ensure all disks are being utilized. The restrictions
@@ -406,9 +407,23 @@ document.write("Last Published: " + document.lastModified);
                        successful allocation even in the presence of a few bad
                        successful allocation even in the presence of a few bad
                        nodes in the cluster.
                        nodes in the cluster.
                        </li>
                        </li>
+          
+<li>workers_per_ring: Number of workers per service per HodRing.
+                       By default this is set to 1. If this configuration
+                       variable is set to a value 'n', the HodRing will run
+                       'n' instances of the workers (TaskTrackers or DataNodes)
+                       on each node acting as a slave. This can be used to run
+                       multiple workers per HodRing, so that the total number of
+                       workers  in a HOD cluster is not limited by the total
+                       number of nodes requested during allocation. However, note
+                       that this will mean each worker should be configured to use
+                       only a proportional fraction of the capacity of the 
+                       resources on the node. In general, this feature is only
+                       useful for testing and simulation purposes, and not for
+                       production use.</li>
         
         
 </ul>
 </ul>
-<a name="N100A5"></a><a name="3.5+gridservice-hdfs+options"></a>
+<a name="N100A8"></a><a name="3.5+gridservice-hdfs+options"></a>
 <h3 class="h4">3.5 gridservice-hdfs options</h3>
 <h3 class="h4">3.5 gridservice-hdfs options</h3>
 <ul>
 <ul>
           
           
@@ -449,7 +464,7 @@ document.write("Last Published: " + document.lastModified);
 <li>final-server-params: Same as above, except they will be marked final.</li>
 <li>final-server-params: Same as above, except they will be marked final.</li>
         
         
 </ul>
 </ul>
-<a name="N100C4"></a><a name="3.6+gridservice-mapred+options"></a>
+<a name="N100C7"></a><a name="3.6+gridservice-mapred+options"></a>
 <h3 class="h4">3.6 gridservice-mapred options</h3>
 <h3 class="h4">3.6 gridservice-mapred options</h3>
 <ul>
 <ul>
           
           
@@ -482,7 +497,7 @@ document.write("Last Published: " + document.lastModified);
 <li>final-server-params: Same as above, except they will be marked final.</li>
 <li>final-server-params: Same as above, except they will be marked final.</li>
         
         
 </ul>
 </ul>
-<a name="N100E3"></a><a name="3.7+hodring+options"></a>
+<a name="N100E6"></a><a name="3.7+hodring+options"></a>
 <h3 class="h4">3.7 hodring options</h3>
 <h3 class="h4">3.7 hodring options</h3>
 <ul>
 <ul>
           
           

ファイルの差分が大きいため隠しています
+ 3 - 3
docs/hod_config_guide.pdf


+ 3 - 0
src/contrib/hod/CHANGES.txt

@@ -6,6 +6,9 @@ Trunk (unreleased changes)
 
 
   NEW FEATURES
   NEW FEATURES
 
 
+    HADOOP-3695. Provide an ability to start multiple workers per node.
+    (Vinod Kumar Vavilapalli via yhemanth)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
   OPTIMIZATIONS
   OPTIMIZATIONS

+ 9 - 1
src/contrib/hod/bin/hod

@@ -229,8 +229,10 @@ defList = { 'hod' : (
 
 
              ('max-master-failures', 'pos_int', 
              ('max-master-failures', 'pos_int', 
               'Defines how many times a master can fail before' \
               'Defines how many times a master can fail before' \
-              ' failing cluster allocation', False, 5, True, True)),
+              ' failing cluster allocation', False, 5, True, True),
 
 
+             ('workers_per_ring', 'pos_int', 'Defines number of workers per service per hodring',
+             False, 1, False, True)),
 
 
             'gridservice-mapred' : (
             'gridservice-mapred' : (
              ('external', 'bool', "Connect to an already running MapRed?",
              ('external', 'bool', "Connect to an already running MapRed?",
@@ -517,6 +519,12 @@ if __name__ == '__main__':
           "The log destiniation uri must be of type local:// or hdfs://."))
           "The log destiniation uri must be of type local:// or hdfs://."))
         sys.exit(1)
         sys.exit(1)
   
   
+    if hodConfig['ringmaster']['workers_per_ring'] < 1:
+      printErrors(hodConfig.var_error('ringmaster', 'workers_per_ring',
+                "ringmaster.workers_per_ring must be a positive integer " +
+                "greater than or equal to 1"))
+      sys.exit(1)
+                        
     ## TODO : end of should move the dependency verification to hodConfig.verif
     ## TODO : end of should move the dependency verification to hodConfig.verif
       
       
     hodConfig['hod']['base-dir'] = rootDirectory
     hodConfig['hod']['base-dir'] = rootDirectory

+ 4 - 1
src/contrib/hod/bin/ringmaster

@@ -117,7 +117,10 @@ defList = { 'ringmaster' : (
 
 
              ('max-master-failures', 'pos_int', 
              ('max-master-failures', 'pos_int', 
               'Defines how many times a master can fail before' \
               'Defines how many times a master can fail before' \
-              ' failing cluster allocation', False, 5, True, True)),
+              ' failing cluster allocation', False, 5, True, True),
+
+             ('workers_per_ring', 'pos_int', 'Defines number of workers per service per hodring',
+              False, 1, False, True)),
 
 
             'resource_manager' : (
             'resource_manager' : (
              ('id', 'string', 'Batch scheduler ID: torque|condor.',
              ('id', 'string', 'Batch scheduler ID: torque|condor.',

+ 18 - 5
src/contrib/hod/hodlib/GridServices/hdfs.py

@@ -76,7 +76,8 @@ class HdfsExternal(MasterSlave):
 class Hdfs(MasterSlave):
 class Hdfs(MasterSlave):
 
 
   def __init__(self, serviceDesc, nodePool, required_node, version, \
   def __init__(self, serviceDesc, nodePool, required_node, version, \
-                                        format=True, upgrade=False):
+                                        format=True, upgrade=False,
+                                        workers_per_ring = 1):
     MasterSlave.__init__(self, serviceDesc, nodePool, required_node)
     MasterSlave.__init__(self, serviceDesc, nodePool, required_node)
     self.masterNode = None
     self.masterNode = None
     self.masterAddr = None
     self.masterAddr = None
@@ -87,6 +88,7 @@ class Hdfs(MasterSlave):
     self.upgrade = upgrade
     self.upgrade = upgrade
     self.workers = []
     self.workers = []
     self.version = version
     self.version = version
+    self.workers_per_ring = workers_per_ring
 
 
   def getMasterRequest(self):
   def getMasterRequest(self):
     req = NodeRequest(1, [], False)
     req = NodeRequest(1, [], False)
@@ -117,8 +119,11 @@ class Hdfs(MasterSlave):
     return adminCommands
     return adminCommands
 
 
   def getWorkerCommands(self, serviceDict):
   def getWorkerCommands(self, serviceDict):
-    cmdDesc = self._getDataNodeCommand()
-    return [cmdDesc]
+    workerCmds = []
+    for id in range(1, self.workers_per_ring + 1):
+      workerCmds.append(self._getDataNodeCommand(str(id)))
+
+    return workerCmds
 
 
   def setMasterNodes(self, list):
   def setMasterNodes(self, list):
     node = list[0]
     node = list[0]
@@ -250,7 +255,7 @@ class Hdfs(MasterSlave):
     cmd = CommandDesc(dict)
     cmd = CommandDesc(dict)
     return cmd
     return cmd
  
  
-  def _getDataNodeCommand(self):
+  def _getDataNodeCommand(self, id):
 
 
     sd = self.serviceDesc
     sd = self.serviceDesc
 
 
@@ -282,6 +287,14 @@ class Hdfs(MasterSlave):
       # TODO: check for major as well as minor versions
       # TODO: check for major as well as minor versions
       attrs['dfs.datanode.ipc.address'] = 'fillinhostport'
       attrs['dfs.datanode.ipc.address'] = 'fillinhostport'
                     
                     
+    # unique workdirs in case of multiple datanodes per hodring
+    pd = []
+    for dir in parentDirs:
+      dir = dir + "-" + id
+      pd.append(dir)
+    parentDirs = pd
+    # end of unique workdirs
+
     self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-dn')
     self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-dn')
 
 
     dict = { 'name' : 'datanode' }
     dict = { 'name' : 'datanode' }
@@ -292,7 +305,7 @@ class Hdfs(MasterSlave):
     dict['workdirs'] = workDirs
     dict['workdirs'] = workDirs
     dict['final-attrs'] = attrs
     dict['final-attrs'] = attrs
     dict['attrs'] = sd.getAttrs()
     dict['attrs'] = sd.getAttrs()
-
+ 
     cmd = CommandDesc(dict)
     cmd = CommandDesc(dict)
     return cmd
     return cmd
 
 

+ 17 - 4
src/contrib/hod/hodlib/GridServices/mapred.py

@@ -82,7 +82,8 @@ class MapReduceExternal(MasterSlave):
   
   
 class MapReduce(MasterSlave):
 class MapReduce(MasterSlave):
 
 
-  def __init__(self, serviceDesc, workDirs,required_node, version):
+  def __init__(self, serviceDesc, workDirs,required_node, version,
+                workers_per_ring = 1):
     MasterSlave.__init__(self, serviceDesc, workDirs,required_node)
     MasterSlave.__init__(self, serviceDesc, workDirs,required_node)
 
 
     self.masterNode = None
     self.masterNode = None
@@ -91,6 +92,7 @@ class MapReduce(MasterSlave):
     self.workers = []
     self.workers = []
     self.required_node = required_node
     self.required_node = required_node
     self.version = version
     self.version = version
+    self.workers_per_ring = workers_per_ring
 
 
   def isLaunchable(self, serviceDict):
   def isLaunchable(self, serviceDict):
     hdfs = serviceDict['hdfs']
     hdfs = serviceDict['hdfs']
@@ -116,8 +118,11 @@ class MapReduce(MasterSlave):
 
 
     hdfs = serviceDict['hdfs']
     hdfs = serviceDict['hdfs']
 
 
-    cmdDesc = self._getTaskTrackerCommand(hdfs)
-    return [cmdDesc]
+    workerCmds = []
+    for id in range(1, self.workers_per_ring + 1):
+      workerCmds.append(self._getTaskTrackerCommand(str(id), hdfs))
+      
+    return workerCmds
 
 
   def setMasterNodes(self, list):
   def setMasterNodes(self, list):
     node = list[0]
     node = list[0]
@@ -217,7 +222,7 @@ class MapReduce(MasterSlave):
     cmd = CommandDesc(dict)
     cmd = CommandDesc(dict)
     return cmd
     return cmd
 
 
-  def _getTaskTrackerCommand(self, hdfs):
+  def _getTaskTrackerCommand(self, id, hdfs):
 
 
     sd = self.serviceDesc
     sd = self.serviceDesc
 
 
@@ -245,6 +250,14 @@ class MapReduce(MasterSlave):
       if 'mapred.task.tracker.http.address' not in attrs:
       if 'mapred.task.tracker.http.address' not in attrs:
         attrs['mapred.task.tracker.http.address'] = 'fillinhostport'
         attrs['mapred.task.tracker.http.address'] = 'fillinhostport'
 
 
+    # unique parentDirs in case of multiple tasktrackers per hodring
+    pd = []
+    for dir in parentDirs:
+      dir = dir + "-" + id
+      pd.append(dir)
+    parentDirs = pd
+    # end of unique workdirs
+
     self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'mapred-tt')
     self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'mapred-tt')
 
 
     dict = { 'name' : 'tasktracker' }
     dict = { 'name' : 'tasktracker' }

+ 6 - 2
src/contrib/hod/hodlib/RingMaster/ringMaster.py

@@ -553,6 +553,8 @@ class RingMaster:
     self.__isStopped = False # to let main exit
     self.__isStopped = False # to let main exit
     self.__exitCode = 0 # exit code with which the ringmaster main method should return
     self.__exitCode = 0 # exit code with which the ringmaster main method should return
 
 
+    self.workers_per_ring = self.cfg['ringmaster']['workers_per_ring']
+
     self.__initialize_signal_handlers()
     self.__initialize_signal_handlers()
     
     
     sdd = self.cfg['servicedesc']
     sdd = self.cfg['servicedesc']
@@ -609,7 +611,8 @@ class RingMaster:
         hdfs = HdfsExternal(hdfsDesc, workDirs, version=int(hadoopVers['minor']))
         hdfs = HdfsExternal(hdfsDesc, workDirs, version=int(hadoopVers['minor']))
         hdfs.setMasterParams( self.cfg['gridservice-hdfs'] )
         hdfs.setMasterParams( self.cfg['gridservice-hdfs'] )
       else:
       else:
-        hdfs = Hdfs(hdfsDesc, workDirs, 0, version=int(hadoopVers['minor']))
+        hdfs = Hdfs(hdfsDesc, workDirs, 0, version=int(hadoopVers['minor']),
+                    workers_per_ring = self.workers_per_ring)
 
 
       self.serviceDict[hdfs.getName()] = hdfs
       self.serviceDict[hdfs.getName()] = hdfs
       
       
@@ -619,7 +622,8 @@ class RingMaster:
         mr = MapReduceExternal(mrDesc, workDirs, version=int(hadoopVers['minor']))
         mr = MapReduceExternal(mrDesc, workDirs, version=int(hadoopVers['minor']))
         mr.setMasterParams( self.cfg['gridservice-mapred'] )
         mr.setMasterParams( self.cfg['gridservice-mapred'] )
       else:
       else:
-        mr = MapReduce(mrDesc, workDirs,1, version=int(hadoopVers['minor']))
+        mr = MapReduce(mrDesc, workDirs,1, version=int(hadoopVers['minor']),
+                       workers_per_ring = self.workers_per_ring)
 
 
       self.serviceDict[mr.getName()] = mr
       self.serviceDict[mr.getName()] = mr
     except:
     except:

+ 3 - 4
src/contrib/hod/support/logcondense.py

@@ -113,11 +113,10 @@ def runcondense():
   # otherwise only JobTracker logs. Likewise, in case of dynamic dfs, we must also look for
   # otherwise only JobTracker logs. Likewise, in case of dynamic dfs, we must also look for
   # deleting datanode logs
   # deleting datanode logs
   filteredNames = ['jobtracker']
   filteredNames = ['jobtracker']
-  deletedNamePrefixes = ['0-tasktracker-*']
+  deletedNamePrefixes = ['*-tasktracker-*']
   if options.dynamicdfs == 'true':
   if options.dynamicdfs == 'true':
     filteredNames.append('namenode')
     filteredNames.append('namenode')
-    deletedNamePrefixes.append('1-tasktracker-*')
-    deletedNamePrefixes.append('0-datanode-*')
+    deletedNamePrefixes.append('*-datanode-*')
 
 
   filepath = '%s/\*/hod-logs/' % (options.log)
   filepath = '%s/\*/hod-logs/' % (options.log)
   cmd = getDfsCommand(options, "-lsr " + filepath)
   cmd = getDfsCommand(options, "-lsr " + filepath)
@@ -128,7 +127,7 @@ def runcondense():
     try:
     try:
       m = re.match("^.*\s(.*)\n$", line)
       m = re.match("^.*\s(.*)\n$", line)
       filename = m.group(1)
       filename = m.group(1)
-      # file name format: <prefix>/<user>/hod-logs/<jobid>/[0-1]-[jobtracker|tasktracker|datanode|namenode|]-hostname-YYYYMMDDtime-random.tar.gz
+      # file name format: <prefix>/<user>/hod-logs/<jobid>/[0-9]*-[jobtracker|tasktracker|datanode|namenode|]-hostname-YYYYMMDDtime-random.tar.gz
       # first strip prefix:
       # first strip prefix:
       if filename.startswith(options.log):
       if filename.startswith(options.log):
         filename = filename.lstrip(options.log)
         filename = filename.lstrip(options.log)

+ 62 - 23
src/contrib/hod/testing/testRingmasterRPCs.py

@@ -30,6 +30,28 @@ from hodlib.GridServices import *
 from hodlib.Common.desc import ServiceDesc
 from hodlib.Common.desc import ServiceDesc
 from hodlib.RingMaster.ringMaster import _LogMasterSources
 from hodlib.RingMaster.ringMaster import _LogMasterSources
 
 
+configuration = {
+       'hod': {}, 
+      'resource_manager': {
+                            'id': 'torque', 
+                            'batch-home': '/home/y/'
+                          }, 
+       'ringmaster': {
+                      'max-connect' : 2,
+                      'max-master-failures' : 5
+                     }, 
+       'hodring': {
+                  }, 
+       'gridservice-mapred': { 
+                              'id': 'mapred' 
+                             } ,
+       'gridservice-hdfs': { 
+                              'id': 'hdfs' 
+                            }, 
+       'servicedesc' : {} ,
+       'nodepooldesc': {} , 
+       }
+
 # All test-case classes should have the naming convention test_.*
 # All test-case classes should have the naming convention test_.*
 class test_MINITEST1(unittest.TestCase):
 class test_MINITEST1(unittest.TestCase):
   def setUp(self):
   def setUp(self):
@@ -45,12 +67,49 @@ class test_MINITEST1(unittest.TestCase):
   def tearDown(self):
   def tearDown(self):
     pass
     pass
 
 
-class test_MINITEST2(unittest.TestCase):
+class test_Multiple_Workers(unittest.TestCase):
   def setUp(self):
   def setUp(self):
+    self.config = configuration
+    self.config['ringmaster']['workers_per_ring'] = 2
+
+    hdfsDesc = self.config['servicedesc']['hdfs'] = ServiceDesc(self.config['gridservice-hdfs'])
+    mrDesc = self.config['servicedesc']['mapred'] = ServiceDesc(self.config['gridservice-mapred'])
+
+    self.hdfs = Hdfs(hdfsDesc, [], 0, 19, workers_per_ring = \
+                                 self.config['ringmaster']['workers_per_ring'])
+    self.mr = MapReduce(mrDesc, [],1, 19, workers_per_ring = \
+                                 self.config['ringmaster']['workers_per_ring'])
+    
+    self.log = logging.getLogger()
     pass
     pass
 
 
   # All testMethods have to have their names start with 'test'
   # All testMethods have to have their names start with 'test'
-  def testSuccess(self):
+  def testWorkersCount(self):
+    self.serviceDict = {}
+    self.serviceDict[self.hdfs.getName()] = self.hdfs
+    self.serviceDict[self.mr.getName()] = self.mr
+    self.rpcSet = _LogMasterSources(self.serviceDict, self.config, None, self.log, None)
+
+    cmdList = self.rpcSet.getCommand('host1')
+    self.assertEquals(len(cmdList), 2)
+    self.assertEquals(cmdList[0].dict['argv'][0], 'namenode')
+    self.assertEquals(cmdList[1].dict['argv'][0], 'namenode')
+    addParams = ['fs.default.name=host1:51234', 'dfs.http.address=host1:5125' ]
+    self.rpcSet.addMasterParams('host1', addParams)
+    # print "NN is launched"
+
+    cmdList = self.rpcSet.getCommand('host2')
+    self.assertEquals(len(cmdList), 1)
+    self.assertEquals(cmdList[0].dict['argv'][0], 'jobtracker')
+    addParams = ['mapred.job.tracker=host2:51236',
+                 'mapred.job.tracker.http.address=host2:51237']
+    self.rpcSet.addMasterParams('host2', addParams)
+    # print "JT is launched"
+
+    cmdList = self.rpcSet.getCommand('host3')
+    # Verify the workers count per ring : TTs + DNs
+    self.assertEquals(len(cmdList),
+                      self.config['ringmaster']['workers_per_ring'] * 2)
     pass
     pass
     
     
   def testFailure(self):
   def testFailure(self):
@@ -61,27 +120,7 @@ class test_MINITEST2(unittest.TestCase):
 
 
 class test_GetCommand(unittest.TestCase):
 class test_GetCommand(unittest.TestCase):
   def setUp(self):
   def setUp(self):
-    self.config = {
-       'hod': {}, 
-      'resource_manager': {
-                            'id': 'torque', 
-                            'batch-home': '/home/y/'
-                          }, 
-       'ringmaster': {
-                      'max-connect' : 2,
-                      'max-master-failures' : 5
-                     }, 
-       'hodring': {
-                  }, 
-       'gridservice-mapred': { 
-                              'id': 'mapred' 
-                             } ,
-       'gridservice-hdfs': { 
-                              'id': 'hdfs' 
-                            }, 
-       'servicedesc' : {} ,
-       'nodepooldesc': {} , 
-       }
+    self.config = configuration
 
 
     hdfsDesc = self.config['servicedesc']['hdfs'] = ServiceDesc(self.config['gridservice-hdfs'])
     hdfsDesc = self.config['servicedesc']['hdfs'] = ServiceDesc(self.config['gridservice-hdfs'])
     mrDesc = self.config['servicedesc']['mapred'] = ServiceDesc(self.config['gridservice-mapred'])
     mrDesc = self.config['servicedesc']['mapred'] = ServiceDesc(self.config['gridservice-mapred'])

+ 15 - 1
src/docs/src/documentation/content/xdocs/hod_config_guide.xml

@@ -156,7 +156,8 @@
         <ul>
         <ul>
           <li>work-dirs: Comma-separated list of paths that will serve
           <li>work-dirs: Comma-separated list of paths that will serve
                        as the root for directories that HOD generates and passes
                        as the root for directories that HOD generates and passes
-                       to Hadoop for use to store DFS and Map/Reduce data. For e.g.
+                       to Hadoop for use to store DFS and Map/Reduce data. For
+                       example,
                        this is where DFS data blocks will be stored. Typically,
                        this is where DFS data blocks will be stored. Typically,
                        as many paths are specified as there are disks available
                        as many paths are specified as there are disks available
                        to ensure all disks are being utilized. The restrictions
                        to ensure all disks are being utilized. The restrictions
@@ -177,6 +178,19 @@
                        successful allocation even in the presence of a few bad
                        successful allocation even in the presence of a few bad
                        nodes in the cluster.
                        nodes in the cluster.
                        </li>
                        </li>
+          <li>workers_per_ring: Number of workers per service per HodRing.
+                       By default this is set to 1. If this configuration
+                       variable is set to a value 'n', the HodRing will run
+                       'n' instances of the workers (TaskTrackers or DataNodes)
+                       on each node acting as a slave. This can be used to run
+                       multiple workers per HodRing, so that the total number of
+                       workers  in a HOD cluster is not limited by the total
+                       number of nodes requested during allocation. However, note
+                       that this will mean each worker should be configured to use
+                       only a proportional fraction of the capacity of the 
+                       resources on the node. In general, this feature is only
+                       useful for testing and simulation purposes, and not for
+                       production use.</li>
         </ul>
         </ul>
       </section>
       </section>
       
       

この差分においてかなりの量のファイルが変更されているため、一部のファイルを表示していません