Browse Source

Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1292838 13f79535-47bb-0310-9956-ffa450edef68
Aaron Myers 13 years ago
parent
commit
f3026e1085
50 changed files with 1086 additions and 413 deletions
  1. 4 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 1 1
      hadoop-common-project/hadoop-common/pom.xml
  3. 13 4
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  4. 1 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  5. 0 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
  6. 71 29
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  7. 15 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
  8. 34 69
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  9. 12 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
  10. 26 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
  11. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  12. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  13. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java
  14. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
  15. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
  16. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java
  17. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java
  18. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java
  19. 7 7
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
  20. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java
  21. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLargeBlock.java
  22. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
  23. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
  24. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSetrepIncreasing.java
  25. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java
  26. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSmallBlock.java
  27. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
  28. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java
  29. 37 43
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  30. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
  31. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
  32. 16 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
  33. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileLimit.java
  34. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
  35. 4 0
      hadoop-mapreduce-project/CHANGES.txt
  36. 3 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
  37. 3 3
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
  38. 26 18
      hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java
  39. 43 6
      hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
  40. 9 0
      hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
  41. 22 5
      hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java
  42. 115 70
      hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java
  43. 39 10
      hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
  44. 92 33
      hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
  45. 196 56
      hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
  46. 217 0
      hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixStatistics.java
  47. 3 2
      hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
  48. 22 7
      hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java
  49. 3 2
      hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java
  50. 16 0
      hadoop-mapreduce-project/src/docs/src/documentation/content/xdocs/gridmix.xml

+ 4 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -178,6 +178,10 @@ Release 0.23.2 - UNRELEASED
 
   BUG FIXES
 
+    HADOOP-7660. Maven generated .classpath doesnot includes 
+    "target/generated-test-source/java" as source directory.
+    (Laxman via bobby)
+
     HADOOP-8042  When copying a file out of HDFS, modifying it, and uploading
     it back into HDFS, the put fails due to a CRC mismatch
     (Daryn Sharp via bobby)

+ 1 - 1
hadoop-common-project/hadoop-common/pom.xml

@@ -480,7 +480,7 @@
           </execution>
           <execution>
             <id>add-test-source</id>
-            <phase>generate-test-sources</phase>
+            <phase>generate-sources</phase>
             <goals>
               <goal>add-test-source</goal>
             </goals>

+ 13 - 4
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -209,10 +209,6 @@ Trunk (unreleased changes)
     HDFS-2908. Add apache license header for StorageReport.java. (Brandon Li
     via jitendra)
 
-    HDFS-2944. Typo in hdfs-default.xml causes
-    dfs.client.block.write.replace-datanode-on-failure.enable to be mistakenly
-    disabled. (atm)
-
     HDFS-2968. Protocol translator for BlockRecoveryCommand broken when
     multiple blocks need recovery. (todd)
 
@@ -238,6 +234,11 @@ Release 0.23.2 - UNRELEASED
     HDFS-2725. hdfs script usage information is missing the information 
     about "dfs" command (Prashant Sharma via stevel)
 
+    HDFS-2907.  Add a conf property dfs.datanode.fsdataset.factory to make
+    FSDataset in Datanode pluggable.  (szetszwo)
+
+    HDFS-2985. Improve logging when replicas are marked as corrupt. (todd)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -265,6 +266,14 @@ Release 0.23.2 - UNRELEASED
 
     HDFS-2969. ExtendedBlock.equals is incorrectly implemented (todd)
 
+    HDFS-2944. Typo in hdfs-default.xml causes
+    dfs.client.block.write.replace-datanode-on-failure.enable to be mistakenly
+    disabled. (atm)
+
+    HDFS-2981. In hdfs-default.xml, the default value of
+    dfs.client.block.write.replace-datanode-on-failure.enable should be true.
+    (szetszwo)
+
 Release 0.23.1 - 2012-02-17 
 
   INCOMPATIBLE CHANGES

+ 1 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -192,7 +192,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_CLIENT_RETRY_WINDOW_BASE= "dfs.client.retry.window.base";
   public static final String  DFS_METRICS_SESSION_ID_KEY = "dfs.metrics.session-id";
   public static final String  DFS_DATANODE_HOST_NAME_KEY = "dfs.datanode.hostname";
-  public static final String  DFS_DATANODE_STORAGEID_KEY = "dfs.datanode.StorageId";
   public static final String  DFS_NAMENODE_HOSTS_KEY = "dfs.namenode.hosts";
   public static final String  DFS_NAMENODE_HOSTS_EXCLUDE_KEY = "dfs.namenode.hosts.exclude";
   public static final String  DFS_CLIENT_SOCKET_TIMEOUT_KEY = "dfs.client.socket-timeout";
@@ -236,10 +235,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_DATANODE_NUMBLOCKS_DEFAULT = 64;
   public static final String  DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours";
   public static final int     DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0;
-  public static final String  DFS_DATANODE_SIMULATEDDATASTORAGE_KEY = "dfs.datanode.simulateddatastorage";
-  public static final boolean DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT = false;
-  public static final String  DFS_DATANODE_SIMULATEDDATASTORAGE_CAPACITY_KEY = "dfs.datanode.simulateddatastorage.capacity";
-  public static final long    DFS_DATANODE_SIMULATEDDATASTORAGE_CAPACITY_DEFAULT = 2L<<40;
   public static final String  DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed";
   public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
   public static final String  DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY = "dfs.datanode.block.volume.choice.policy";
@@ -307,6 +302,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
 
   //Keys with no defaults
   public static final String  DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
+  public static final String  DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory";
   public static final String  DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY = "dfs.datanode.socket.write.timeout";
   public static final String  DFS_DATANODE_STARTUP_KEY = "dfs.datanode.startup";
   public static final String  DFS_NAMENODE_PLUGINS_KEY = "dfs.namenode.plugins";

+ 0 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java

@@ -88,7 +88,6 @@ public class HdfsConfiguration extends Configuration {
     deprecate("fs.checkpoint.period", DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY);
     deprecate("dfs.upgrade.permission", DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY);
     deprecate("heartbeat.recheck.interval", DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY);
-    deprecate("StorageId", DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY);
     deprecate("dfs.https.client.keystore.resource", DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY);
     deprecate("dfs.https.need.client.auth", DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY);
     deprecate("slave.host.name", DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY);

+ 71 - 29
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -895,9 +895,11 @@ public class BlockManager {
    * Mark the block belonging to datanode as corrupt
    * @param blk Block to be marked as corrupt
    * @param dn Datanode which holds the corrupt replica
+   * @param reason a textual reason why the block should be marked corrupt,
+   * for logging purposes
    */
   public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
-      final DatanodeInfo dn) throws IOException {
+      final DatanodeInfo dn, String reason) throws IOException {
     assert namesystem.hasWriteLock();
     final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
     if (storedBlock == null) {
@@ -909,11 +911,12 @@ public class BlockManager {
           + blk + " not found.");
       return;
     }
-    markBlockAsCorrupt(storedBlock, dn);
+    markBlockAsCorrupt(storedBlock, dn, reason);
   }
 
   private void markBlockAsCorrupt(BlockInfo storedBlock,
-                                  DatanodeInfo dn) throws IOException {
+                                  DatanodeInfo dn,
+                                  String reason) throws IOException {
     assert storedBlock != null : "storedBlock should not be null";
     DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
@@ -937,7 +940,7 @@ public class BlockManager {
     node.addBlock(storedBlock);
 
     // Add this replica to corruptReplicas Map
-    corruptReplicas.addToCorruptReplicasMap(storedBlock, node);
+    corruptReplicas.addToCorruptReplicasMap(storedBlock, node, reason);
     if (countNodes(storedBlock).liveReplicas() >= inode.getReplication()) {
       // the block is over-replicated so invalidate the replicas immediately
       invalidateBlock(storedBlock, node);
@@ -1380,6 +1383,21 @@ public class BlockManager {
       this.reportedState = reportedState;
     }
   }
+  
+  /**
+   * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
+   * list of blocks that should be considered corrupt due to a block report.
+   */
+  private static class BlockToMarkCorrupt {
+    final BlockInfo blockInfo;
+    final String reason;
+    
+    BlockToMarkCorrupt(BlockInfo blockInfo, String reason) {
+      super();
+      this.blockInfo = blockInfo;
+      this.reason = reason;
+    }
+  }
 
   /**
    * The given datanode is reporting all its blocks.
@@ -1478,7 +1496,7 @@ public class BlockManager {
     Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
     Collection<Block> toRemove = new LinkedList<Block>();
     Collection<Block> toInvalidate = new LinkedList<Block>();
-    Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
+    Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
     reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
 
@@ -1498,8 +1516,8 @@ public class BlockManager {
           + " does not belong to any file.");
       addToInvalidates(b, node);
     }
-    for (BlockInfo b : toCorrupt) {
-      markBlockAsCorrupt(b, node);
+    for (BlockToMarkCorrupt b : toCorrupt) {
+      markBlockAsCorrupt(b.blockInfo, node, b.reason);
     }
   }
 
@@ -1540,14 +1558,16 @@ public class BlockManager {
       
       // If block is corrupt, mark it and continue to next block.
       BlockUCState ucState = storedBlock.getBlockUCState();
-      if (isReplicaCorrupt(iblk, reportedState, storedBlock, ucState, node)) {
+      BlockToMarkCorrupt c = checkReplicaCorrupt(
+          iblk, reportedState, storedBlock, ucState, node);
+      if (c != null) {
         if (namesystem.isInStandbyState()) {
           // In the Standby, we may receive a block report for a file that we
           // just have an out-of-date gen-stamp or state for, for example.
           queueReportedBlock(node, iblk, reportedState,
               QUEUE_REASON_CORRUPT_STATE);
         } else {
-          markBlockAsCorrupt(storedBlock, node);
+          markBlockAsCorrupt(c.blockInfo, node, c.reason);
         }
         continue;
       }
@@ -1570,7 +1590,7 @@ public class BlockManager {
       Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor
       Collection<Block> toRemove,           // remove from DatanodeDescriptor
       Collection<Block> toInvalidate,       // should be removed from DN
-      Collection<BlockInfo> toCorrupt,      // add to corrupt replicas list
+      Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
       Collection<StatefulBlockInfo> toUC) { // add to under-construction list
     // place a delimiter in the list which separates blocks 
     // that have been reported from those that have not
@@ -1638,7 +1658,7 @@ public class BlockManager {
       final Block block, final ReplicaState reportedState, 
       final Collection<BlockInfo> toAdd, 
       final Collection<Block> toInvalidate, 
-      final Collection<BlockInfo> toCorrupt,
+      final Collection<BlockToMarkCorrupt> toCorrupt,
       final Collection<StatefulBlockInfo> toUC) {
     
     if(LOG.isDebugEnabled()) {
@@ -1677,16 +1697,17 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
       return storedBlock;
     }
 
-    if (isReplicaCorrupt(block, reportedState, storedBlock, ucState, dn)) {
+    BlockToMarkCorrupt c = checkReplicaCorrupt(
+        block, reportedState, storedBlock, ucState, dn);
+    if (c != null) {
       if (namesystem.isInStandbyState()) {
         // If the block is an out-of-date generation stamp or state,
         // but we're the standby, we shouldn't treat it as corrupt,
         // but instead just queue it for later processing.
         queueReportedBlock(dn, storedBlock, reportedState,
             QUEUE_REASON_CORRUPT_STATE);
-
       } else {
-        toCorrupt.add(storedBlock);
+        toCorrupt.add(c);
       }
       return storedBlock;
     }
@@ -1773,8 +1794,11 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
    * as switch statements, on the theory that it is easier to understand
    * the combinatorics of reportedState and ucState that way.  It should be
    * at least as efficient as boolean expressions.
+   * 
+   * @return a BlockToMarkCorrupt object, or null if the replica is not corrupt
    */
-  private boolean isReplicaCorrupt(Block iblk, ReplicaState reportedState, 
+  private BlockToMarkCorrupt checkReplicaCorrupt(
+      Block iblk, ReplicaState reportedState, 
       BlockInfo storedBlock, BlockUCState ucState, 
       DatanodeDescriptor dn) {
     switch(reportedState) {
@@ -1782,17 +1806,31 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
       switch(ucState) {
       case COMPLETE:
       case COMMITTED:
-        return (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()
-            || storedBlock.getNumBytes() != iblk.getNumBytes());
+        if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) {
+          return new BlockToMarkCorrupt(storedBlock,
+              "block is " + ucState + " and reported genstamp " +
+              iblk.getGenerationStamp() + " does not match " +
+              "genstamp in block map " + storedBlock.getGenerationStamp());
+        } else if (storedBlock.getNumBytes() != iblk.getNumBytes()) {
+          return new BlockToMarkCorrupt(storedBlock,
+              "block is " + ucState + " and reported length " +
+              iblk.getNumBytes() + " does not match " +
+              "length in block map " + storedBlock.getNumBytes());
+        } else {
+          return null; // not corrupt
+        }
       default:
-        return false;
+        return null;
       }
     case RBW:
     case RWR:
       if (!storedBlock.isComplete()) {
-        return false;
+        return null; // not corrupt
       } else if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) {
-        return true;
+        return new BlockToMarkCorrupt(storedBlock,
+            "reported " + reportedState + " replica with genstamp " +
+            iblk.getGenerationStamp() + " does not match COMPLETE block's " +
+            "genstamp in block map " + storedBlock.getGenerationStamp());
       } else { // COMPLETE block, same genstamp
         if (reportedState == ReplicaState.RBW) {
           // If it's a RBW report for a COMPLETE block, it may just be that
@@ -1802,18 +1840,22 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
           LOG.info("Received an RBW replica for block " + storedBlock +
               " on " + dn.getName() + ": ignoring it, since the block is " +
               "complete with the same generation stamp.");
-          return false;
+          return null;
         } else {
-          return true;
+          return new BlockToMarkCorrupt(storedBlock,
+              "reported replica has invalid state " + reportedState);
         }
       }
     case RUR:       // should not be reported
     case TEMPORARY: // should not be reported
     default:
-      LOG.warn("Unexpected replica state " + reportedState
-          + " for block: " + storedBlock + 
-          " on " + dn.getName() + " size " + storedBlock.getNumBytes());
-      return true;
+      String msg = "Unexpected replica state " + reportedState
+      + " for block: " + storedBlock + 
+      " on " + dn.getName() + " size " + storedBlock.getNumBytes();
+      // log here at WARN level since this is really a broken HDFS
+      // invariant
+      LOG.warn(msg);
+      return new BlockToMarkCorrupt(storedBlock, msg);
     }
   }
 
@@ -2406,7 +2448,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
     // blockReceived reports a finalized block
     Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
     Collection<Block> toInvalidate = new LinkedList<Block>();
-    Collection<BlockInfo> toCorrupt = new LinkedList<BlockInfo>();
+    Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
     processReportedBlock(node, block, reportedState,
                               toAdd, toInvalidate, toCorrupt, toUC);
@@ -2427,8 +2469,8 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
           + " does not belong to any file.");
       addToInvalidates(b, node);
     }
-    for (BlockInfo b : toCorrupt) {
-      markBlockAsCorrupt(b, node);
+    for (BlockToMarkCorrupt b : toCorrupt) {
+      markBlockAsCorrupt(b.blockInfo, node, b.reason);
     }
   }
 

+ 15 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java

@@ -44,25 +44,37 @@ public class CorruptReplicasMap{
    *
    * @param blk Block to be added to CorruptReplicasMap
    * @param dn DatanodeDescriptor which holds the corrupt replica
+   * @param reason a textual reason (for logging purposes)
    */
-  public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn) {
+  public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
+      String reason) {
     Collection<DatanodeDescriptor> nodes = getNodes(blk);
     if (nodes == null) {
       nodes = new TreeSet<DatanodeDescriptor>();
       corruptReplicasMap.put(blk, nodes);
     }
+    
+    String reasonText;
+    if (reason != null) {
+      reasonText = " because " + reason;
+    } else {
+      reasonText = "";
+    }
+    
     if (!nodes.contains(dn)) {
       nodes.add(dn);
       NameNode.stateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
                                    blk.getBlockName() +
                                    " added as corrupt on " + dn.getName() +
-                                   " by " + Server.getRemoteIp());
+                                   " by " + Server.getRemoteIp() +
+                                   reasonText);
     } else {
       NameNode.stateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
                                    "duplicate requested for " + 
                                    blk.getBlockName() + " to add as corrupt " +
                                    "on " + dn.getName() +
-                                   " by " + Server.getRemoteIp());
+                                   " by " + Server.getRemoteIp() +
+                                   reasonText);
     }
   }
 

+ 34 - 69
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -43,10 +43,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTPS_ENABLE_KEY;
 
@@ -162,7 +159,6 @@ import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
@@ -301,13 +297,14 @@ public class DataNode extends Configured
     }
   }
 
-  private synchronized void setClusterId(String cid) throws IOException {
-    if(clusterId != null && !clusterId.equals(cid)) {
-      throw new IOException ("cluster id doesn't match. old cid=" + clusterId 
-          + " new cid="+ cid);
+  private synchronized void setClusterId(final String nsCid, final String bpid
+      ) throws IOException {
+    if(clusterId != null && !clusterId.equals(nsCid)) {
+      throw new IOException ("Cluster IDs not matched: dn cid=" + clusterId 
+          + " but ns cid="+ nsCid + "; bpid=" + bpid);
     }
     // else
-    clusterId = cid;    
+    clusterId = nsCid;
   }
 
   private static String getHostName(Configuration config)
@@ -752,51 +749,22 @@ public class DataNode extends Configured
    */
   void initBlockPool(BPOfferService bpos) throws IOException {
     NamespaceInfo nsInfo = bpos.getNamespaceInfo();
-    Preconditions.checkState(nsInfo != null,
-        "Block pool " + bpos + " should have retrieved " +
-        "its namespace info before calling initBlockPool.");
+    if (nsInfo == null) {
+      throw new IOException("NamespaceInfo not found: Block pool " + bpos
+          + " should have retrieved namespace info before initBlockPool.");
+    }
     
-    String blockPoolId = nsInfo.getBlockPoolID();
-
     // Register the new block pool with the BP manager.
     blockPoolManager.addBlockPool(bpos);
 
-    synchronized (this) {
-      // we do not allow namenode from different cluster to register
-      if(clusterId != null && !clusterId.equals(nsInfo.clusterID)) {
-        throw new IOException(
-            "cannot register with the namenode because clusterid do not match:"
-            + " nn=" + nsInfo.getBlockPoolID() + "; nn cid=" + nsInfo.clusterID + 
-            ";dn cid=" + clusterId);
-      }
-
-      setClusterId(nsInfo.clusterID);
-    }
-    
-    StartupOption startOpt = getStartupOption(conf);
-    assert startOpt != null : "Startup option must be set.";
-
-    boolean simulatedFSDataset = conf.getBoolean(
-        DFS_DATANODE_SIMULATEDDATASTORAGE_KEY,
-        DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT);
-    
-    if (!simulatedFSDataset) {
-      // read storage info, lock data dirs and transition fs state if necessary          
-      storage.recoverTransitionRead(DataNode.this, blockPoolId, nsInfo,
-          dataDirs, startOpt);
-      StorageInfo bpStorage = storage.getBPStorage(blockPoolId);
-      LOG.info("setting up storage: nsid=" +
-          bpStorage.getNamespaceID() + ";bpid="
-          + blockPoolId + ";lv=" + storage.getLayoutVersion() +
-          ";nsInfo=" + nsInfo);
-    }
+    setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
     
     // In the case that this is the first block pool to connect, initialize
     // the dataset, block scanners, etc.
-    initFsDataSet();
+    initStorage(nsInfo);
     initPeriodicScanners(conf);
     
-    data.addBlockPool(blockPoolId, conf);
+    data.addBlockPool(nsInfo.getBlockPoolID(), conf);
   }
 
   /**
@@ -823,31 +791,28 @@ public class DataNode extends Configured
    * Initializes the {@link #data}. The initialization is done only once, when
    * handshake with the the first namenode is completed.
    */
-  private synchronized void initFsDataSet() throws IOException {
-    if (data != null) { // Already initialized
-      return;
-    }
-
-    // get version and id info from the name-node
-    boolean simulatedFSDataset = conf.getBoolean(
-        DFS_DATANODE_SIMULATEDDATASTORAGE_KEY,
-        DFS_DATANODE_SIMULATEDDATASTORAGE_DEFAULT);
-
-    if (simulatedFSDataset) {
-      storage.createStorageID(getPort());
-      // it would have been better to pass storage as a parameter to
-      // constructor below - need to augment ReflectionUtils used below.
-      conf.set(DFS_DATANODE_STORAGEID_KEY, getStorageId());
-      try {
-        data = (FSDatasetInterface) ReflectionUtils.newInstance(
-            Class.forName(
-            "org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"),
-            conf);
-      } catch (ClassNotFoundException e) {
-        throw new IOException(StringUtils.stringifyException(e));
+  private void initStorage(final NamespaceInfo nsInfo) throws IOException {
+    final FSDatasetInterface.Factory factory
+        = FSDatasetInterface.Factory.getFactory(conf);
+    
+    if (!factory.isSimulated()) {
+      final StartupOption startOpt = getStartupOption(conf);
+      if (startOpt == null) {
+        throw new IOException("Startup option not set.");
+      }
+      final String bpid = nsInfo.getBlockPoolID();
+      //read storage info, lock data dirs and transition fs state if necessary
+      storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt);
+      final StorageInfo bpStorage = storage.getBPStorage(bpid);
+      LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID()
+          + ";bpid=" + bpid + ";lv=" + storage.getLayoutVersion()
+          + ";nsInfo=" + nsInfo);
+    }
+
+    synchronized(this)  {
+      if (data == null) {
+        data = factory.createFSDatasetInterface(this, storage, conf);
       }
-    } else {
-      data = new FSDataset(this, storage, conf);
     }
   }
 

+ 12 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

@@ -75,6 +75,16 @@ import org.apache.hadoop.util.ReflectionUtils;
  ***************************************************/
 @InterfaceAudience.Private
 class FSDataset implements FSDatasetInterface {
+  /**
+   * A factory for creating FSDataset objects.
+   */
+  static class Factory extends FSDatasetInterface.Factory {
+    @Override
+    public FSDatasetInterface createFSDatasetInterface(DataNode datanode,
+        DataStorage storage, Configuration conf) throws IOException {
+      return new FSDataset(datanode, storage, conf);
+    }
+  }
 
   /**
    * A node type that can be built into a tree reflecting the
@@ -1056,8 +1066,8 @@ class FSDataset implements FSDatasetInterface {
   /**
    * An FSDataset has a directory where it loads its data files.
    */
-  FSDataset(DataNode datanode, DataStorage storage, Configuration conf)
-      throws IOException {
+  private FSDataset(DataNode datanode, DataStorage storage, Configuration conf
+      ) throws IOException {
     this.datanode = datanode;
     this.maxBlocksPerDir = 
       conf.getInt(DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,

+ 26 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java

@@ -29,6 +29,7 @@ import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlo
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 /**
@@ -49,6 +51,30 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
  */
 @InterfaceAudience.Private
 public interface FSDatasetInterface extends FSDatasetMBean {
+  /**
+   * A factory for creating FSDatasetInterface objects.
+   */
+  public abstract class Factory {
+    /** @return the configured factory. */
+    public static Factory getFactory(Configuration conf) {
+      final Class<? extends Factory> clazz = conf.getClass(
+          DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
+          FSDataset.Factory.class,
+          Factory.class);
+      return ReflectionUtils.newInstance(clazz, conf);
+    }
+
+    /** Create a FSDatasetInterface object. */
+    public abstract FSDatasetInterface createFSDatasetInterface(
+        DataNode datanode, DataStorage storage, Configuration conf
+        ) throws IOException;
+
+    /** Does the factory create simulated objects? */
+    public boolean isSimulated() {
+      return false;
+    }
+  }
+
   /**
    * This is an interface for the underlying volume.
    * @see org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -4413,7 +4413,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         DatanodeInfo[] nodes = blocks[i].getLocations();
         for (int j = 0; j < nodes.length; j++) {
           DatanodeInfo dn = nodes[j];
-          blockManager.findAndMarkBlockAsCorrupt(blk, dn);
+          blockManager.findAndMarkBlockAsCorrupt(blk, dn,
+              "client machine reported it");
         }
       }
     } finally {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -361,7 +361,7 @@
 
 <property>
   <name>dfs.client.block.write.replace-datanode-on-failure.enable</name>
-  <value>false</value>
+  <value>true</value>
   <description>
     If there is a datanode/network failure in the write pipeline,
     DFSClient will try to remove the failed datanode from the pipeline

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DataNodeCluster.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.namenode.CreateEditsLog;
 import org.apache.hadoop.net.DNS;
@@ -122,10 +123,9 @@ public class DataNodeCluster {
         }
         dataNodeDirs = args[i];
       } else if (args[i].equals("-simulated")) {
-        conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+        SimulatedFSDataset.setFactory(conf);
       } else if (args[i].equals("-inject")) {
-        if (!conf.getBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED,
-                                                                false) ) {
+        if (!FSDatasetInterface.Factory.getFactory(conf).isSimulated()) {
           System.out.print("-inject is valid only for simulated");
           printUsageExit(); 
         }
@@ -158,7 +158,7 @@ public class DataNodeCluster {
       System.exit(-1);
     }
     boolean simulated = 
-      conf.getBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, false);
+        FSDatasetInterface.Factory.getFactory(conf).isSimulated();
     System.out.println("Starting " + numDataNodes + 
           (simulated ? " Simulated " : " ") +
           " Data Nodes that will connect to Name Node at " + nameNodeAdr);

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -982,7 +982,7 @@ public class MiniDFSCluster {
         conf.set(DFS_DATANODE_DATA_DIR_KEY, dirs);
       }
       if (simulatedCapacities != null) {
-        dnConf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+        SimulatedFSDataset.setFactory(dnConf);
         dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
             simulatedCapacities[i-curDatanodesNum]);
       }

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java

@@ -107,7 +107,7 @@ public class TestFileAppend{
   public void testCopyOnWrite() throws IOException {
     Configuration conf = new HdfsConfiguration();
     if (simulatedStorage) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
     FileSystem fs = cluster.getFileSystem();
@@ -178,7 +178,7 @@ public class TestFileAppend{
   public void testSimpleFlush() throws IOException {
     Configuration conf = new HdfsConfiguration();
     if (simulatedStorage) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
@@ -234,7 +234,7 @@ public class TestFileAppend{
   public void testComplexFlush() throws IOException {
     Configuration conf = new HdfsConfiguration();
     if (simulatedStorage) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
@@ -283,7 +283,7 @@ public class TestFileAppend{
   public void testFileNotFound() throws IOException {
     Configuration conf = new HdfsConfiguration();
     if (simulatedStorage) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
     FileSystem fs = cluster.getFileSystem();

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java

@@ -82,7 +82,7 @@ public class TestFileAppend2 extends TestCase {
   public void testSimpleAppend() throws IOException {
     final Configuration conf = new HdfsConfiguration();
     if (simulatedStorage) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 50);
     conf.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend4.java

@@ -77,7 +77,7 @@ public class TestFileAppend4 {
   public void setUp() throws Exception {
     this.conf = new Configuration();
     if (simulatedStorage) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     conf.setBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, true);
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java

@@ -150,7 +150,7 @@ public class TestFileCorruption extends TestCase {
       ns.writeLock();
       try {
         cluster.getNamesystem().getBlockManager().findAndMarkBlockAsCorrupt(
-            blk, new DatanodeInfo(dnR));
+            blk, new DatanodeInfo(dnR), "TEST");
       } finally {
         ns.writeUnlock();
       }

+ 7 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java

@@ -145,7 +145,7 @@ public class TestFileCreation extends junit.framework.TestCase {
   public void testFileCreation() throws IOException {
     Configuration conf = new HdfsConfiguration();
     if (simulatedStorage) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
     FileSystem fs = cluster.getFileSystem();
@@ -224,7 +224,7 @@ public class TestFileCreation extends junit.framework.TestCase {
   public void testDeleteOnExit() throws IOException {
     Configuration conf = new HdfsConfiguration();
     if (simulatedStorage) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
     FileSystem fs = cluster.getFileSystem();
@@ -288,7 +288,7 @@ public class TestFileCreation extends junit.framework.TestCase {
     conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
     conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1);
     if (simulatedStorage) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     // create cluster
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
@@ -362,7 +362,7 @@ public class TestFileCreation extends junit.framework.TestCase {
     conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
     conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1);
     if (simulatedStorage) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     // create cluster
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
@@ -461,7 +461,7 @@ public class TestFileCreation extends junit.framework.TestCase {
     conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
     conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1);
     if (simulatedStorage) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
 
     // create cluster
@@ -600,7 +600,7 @@ public class TestFileCreation extends junit.framework.TestCase {
     Configuration conf = new HdfsConfiguration();
     System.out.println("Testing adbornal client death.");
     if (simulatedStorage) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
     FileSystem fs = cluster.getFileSystem();
@@ -635,7 +635,7 @@ public class TestFileCreation extends junit.framework.TestCase {
   public void testFileCreationNonRecursive() throws IOException {
     Configuration conf = new HdfsConfiguration();
     if (simulatedStorage) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
     FileSystem fs = cluster.getFileSystem();

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java

@@ -137,7 +137,7 @@ public class TestInjectionForSimulatedStorage extends TestCase {
       Configuration conf = new HdfsConfiguration();
       conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, Integer.toString(numDataNodes));
       conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, checksumSize);
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
       //first time format
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
       cluster.waitActive();
@@ -160,7 +160,7 @@ public class TestInjectionForSimulatedStorage extends TestCase {
       
       LOG.info("Restarting minicluster");
       conf = new HdfsConfiguration();
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
       conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f"); 
       
       cluster = new MiniDFSCluster.Builder(conf)

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLargeBlock.java

@@ -175,7 +175,7 @@ public class TestLargeBlock {
 
     Configuration conf = new Configuration();
     if (simulatedStorage) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
     FileSystem fs = cluster.getFileSystem();

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java

@@ -206,7 +206,7 @@ public class TestPread extends TestCase {
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
     conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
     if (simulatedStorage) {
-      conf.setBoolean(DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_KEY, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     if (disableTransferTo) {
       conf.setBoolean("dfs.datanode.transferTo.allowed", false);

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java

@@ -200,7 +200,7 @@ public class TestReplication extends TestCase {
     Configuration conf = new HdfsConfiguration();
     conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
     if (simulated) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
                                                .numDataNodes(numDatanodes)

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSetrepIncreasing.java

@@ -28,7 +28,7 @@ public class TestSetrepIncreasing extends TestCase {
   static void setrep(int fromREP, int toREP, boolean simulatedStorage) throws IOException {
     Configuration conf = new HdfsConfiguration();
     if (simulatedStorage) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "" + fromREP);
     conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java

@@ -124,7 +124,7 @@ public class TestShortCircuitLocalRead {
     conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
         UserGroupInformation.getCurrentUser().getShortUserName());
     if (simulatedStorage) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
         .format(true).build();
@@ -248,7 +248,7 @@ public class TestShortCircuitLocalRead {
     conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
         UserGroupInformation.getCurrentUser().getShortUserName());
     if (simulatedStorage) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
         .format(true).build();

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSmallBlock.java

@@ -93,7 +93,7 @@ public class TestSmallBlock extends TestCase {
   public void testSmallBlock() throws IOException {
     Configuration conf = new HdfsConfiguration();
     if (simulatedStorage) {
-      conf.setBoolean(DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_KEY, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     conf.set(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, "1");
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java

@@ -77,7 +77,7 @@ public class TestBalancer extends TestCase {
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
-    conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    SimulatedFSDataset.setFactory(conf);
     conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
   }
 

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java

@@ -83,14 +83,14 @@ public class TestCorruptReplicaInfo extends TestCase {
       DatanodeDescriptor dn1 = new DatanodeDescriptor();
       DatanodeDescriptor dn2 = new DatanodeDescriptor();
       
-      crm.addToCorruptReplicasMap(getBlock(0), dn1);
+      crm.addToCorruptReplicasMap(getBlock(0), dn1, "TEST");
       assertEquals("Number of corrupt blocks not returning correctly",
                    1, crm.size());
-      crm.addToCorruptReplicasMap(getBlock(1), dn1);
+      crm.addToCorruptReplicasMap(getBlock(1), dn1, "TEST");
       assertEquals("Number of corrupt blocks not returning correctly",
                    2, crm.size());
       
-      crm.addToCorruptReplicasMap(getBlock(1), dn2);
+      crm.addToCorruptReplicasMap(getBlock(1), dn2, "TEST");
       assertEquals("Number of corrupt blocks not returning correctly",
                    2, crm.size());
       
@@ -103,7 +103,7 @@ public class TestCorruptReplicaInfo extends TestCase {
                    0, crm.size());
       
       for (Long block_id: block_ids) {
-        crm.addToCorruptReplicasMap(getBlock(block_id), dn1);
+        crm.addToCorruptReplicasMap(getBlock(block_id), dn1, "TEST");
       }
             
       assertEquals("Number of corrupt blocks not returning correctly",

+ 37 - 43
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -32,7 +31,6 @@ import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -63,21 +61,33 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
  * 
  * Note the synchronization is coarse grained - it is at each method. 
  */
+public class SimulatedFSDataset implements FSDatasetInterface {
+  static class Factory extends FSDatasetInterface.Factory {
+    @Override
+    public FSDatasetInterface createFSDatasetInterface(DataNode datanode,
+        DataStorage storage, Configuration conf) throws IOException {
+      return new SimulatedFSDataset(datanode, storage, conf);
+    }
 
-public class SimulatedFSDataset  implements FSDatasetInterface, Configurable{
+    @Override
+    public boolean isSimulated() {
+      return true;
+    }
+  }
+  
+  public static void setFactory(Configuration conf) {
+    conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
+        Factory.class.getName());
+  }
   
-  public static final String CONFIG_PROPERTY_SIMULATED =
-      DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_KEY;
   public static final String CONFIG_PROPERTY_CAPACITY =
-      DFSConfigKeys.DFS_DATANODE_SIMULATEDDATASTORAGE_CAPACITY_KEY;
+      "dfs.datanode.simulateddatastorage.capacity";
   
   public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte
-  public static final byte DEFAULT_DATABYTE = 9; // 1 terabyte
-  byte simulatedDataByte = DEFAULT_DATABYTE;
-  Configuration conf = null;
+  public static final byte DEFAULT_DATABYTE = 9;
   
-  static byte[] nullCrcFileData;
-  {
+  static final byte[] nullCrcFileData;
+  static {
     DataChecksum checksum = DataChecksum.newDataChecksum( DataChecksum.
                               CHECKSUM_NULL, 16*1024 );
     byte[] nullCrcHeader = checksum.getHeader();
@@ -360,31 +370,22 @@ public class SimulatedFSDataset  implements FSDatasetInterface, Configurable{
     }
   }
   
-  private Map<String, Map<Block, BInfo>> blockMap = null;
-  private SimulatedStorage storage = null;
-  private String storageId;
-  
-  public SimulatedFSDataset(Configuration conf) {
-    setConf(conf);
-  }
+  private final Map<String, Map<Block, BInfo>> blockMap
+      = new HashMap<String, Map<Block,BInfo>>();
+  private final SimulatedStorage storage;
+  private final String storageId;
   
-  // Constructor used for constructing the object using reflection
-  @SuppressWarnings("unused")
-  private SimulatedFSDataset() { // real construction when setConf called..
-  }
-  
-  public Configuration getConf() {
-    return conf;
-  }
-
-  public void setConf(Configuration iconf)  {
-    conf = iconf;
-    storageId = conf.get(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY, "unknownStorageId" +
-                                        new Random().nextInt());
+  public SimulatedFSDataset(DataNode datanode, DataStorage storage,
+      Configuration conf) {
+    if (storage != null) {
+      storage.createStorageID(datanode.getPort());
+      this.storageId = storage.getStorageID();
+    } else {
+      this.storageId = "unknownStorageId" + new Random().nextInt();
+    }
     registerMBean(storageId);
-    storage = new SimulatedStorage(
+    this.storage = new SimulatedStorage(
         conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY));
-    blockMap = new HashMap<String, Map<Block,BInfo>>(); 
   }
 
   public synchronized void injectBlocks(String bpid,
@@ -441,23 +442,16 @@ public class SimulatedFSDataset  implements FSDatasetInterface, Configurable{
 
   @Override
   public synchronized BlockListAsLongs getBlockReport(String bpid) {
+    final List<Block> blocks = new ArrayList<Block>();
     final Map<Block, BInfo> map = blockMap.get(bpid);
-    Block[] blockTable = new Block[map.size()];
     if (map != null) {
-      int count = 0;
       for (BInfo b : map.values()) {
         if (b.isFinalized()) {
-          blockTable[count++] = b.theBlock;
+          blocks.add(b.theBlock);
         }
       }
-      if (count != blockTable.length) {
-        blockTable = Arrays.copyOf(blockTable, count);
-      }
-    } else {
-      blockTable = new Block[0];
     }
-    return new BlockListAsLongs(
-        new ArrayList<Block>(Arrays.asList(blockTable)), null);
+    return new BlockListAsLongs(blocks, null);
   }
 
   @Override // FSDatasetMBean

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java

@@ -94,7 +94,7 @@ public class TestBPOfferService {
     .when(mockDn).getMetrics();
 
     // Set up a simulated dataset with our fake BP
-    mockFSDataset = Mockito.spy(new SimulatedFSDataset(conf));
+    mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, null, conf));
     mockFSDataset.addBlockPool(FAKE_BPID, conf);
 
     // Wire the dataset to the DN.

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java

@@ -34,7 +34,7 @@ public class TestDataNodeMetrics extends TestCase {
   
   public void testDataNodeMetrics() throws Exception {
     Configuration conf = new HdfsConfiguration();
-    conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    SimulatedFSDataset.setFactory(conf);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
     try {
       FileSystem fs = cluster.getFileSystem();

+ 16 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java

@@ -44,8 +44,8 @@ public class TestSimulatedFSDataset extends TestCase {
 
   protected void setUp() throws Exception {
     super.setUp();
-      conf = new HdfsConfiguration();
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    conf = new HdfsConfiguration();
+    SimulatedFSDataset.setFactory(conf);
   }
 
   protected void tearDown() throws Exception {
@@ -86,6 +86,18 @@ public class TestSimulatedFSDataset extends TestCase {
   int addSomeBlocks(FSDatasetInterface fsdataset ) throws IOException {
     return addSomeBlocks(fsdataset, 1);
   }
+  
+  public void testFSDatasetFactory() {
+    final Configuration conf = new Configuration();
+    FSDatasetInterface.Factory f = FSDatasetInterface.Factory.getFactory(conf);
+    assertEquals(FSDataset.Factory.class, f.getClass());
+    assertFalse(f.isSimulated());
+
+    SimulatedFSDataset.setFactory(conf);
+    FSDatasetInterface.Factory s = FSDatasetInterface.Factory.getFactory(conf);
+    assertEquals(SimulatedFSDataset.Factory.class, s.getClass());
+    assertTrue(s.isSimulated());
+  }
 
   public void testGetMetaData() throws IOException {
     FSDatasetInterface fsdataset = getSimulatedFSDataset(); 
@@ -287,8 +299,8 @@ public class TestSimulatedFSDataset extends TestCase {
     }
   }
   
-  private SimulatedFSDataset getSimulatedFSDataset() throws IOException {
-    SimulatedFSDataset fsdataset = new SimulatedFSDataset(conf); 
+  private SimulatedFSDataset getSimulatedFSDataset() {
+    SimulatedFSDataset fsdataset = new SimulatedFSDataset(null, null, conf); 
     fsdataset.addBlockPool(bpid, conf);
     return fsdataset;
   }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileLimit.java

@@ -84,7 +84,7 @@ public class TestFileLimit extends TestCase {
     int currentNodes = 0;
     
     if (simulatedStorage) {
-      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+      SimulatedFSDataset.setFactory(conf);
     }
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
     FileSystem fs = cluster.getFileSystem();

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java

@@ -174,7 +174,8 @@ public class TestNameNodeMetrics {
         cluster.getNameNode(), file.toString(), 0, 1).get(0);
     cluster.getNamesystem().writeLock();
     try {
-      bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0]);
+      bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
+          "TEST");
     } finally {
       cluster.getNamesystem().writeUnlock();
     }
@@ -218,7 +219,8 @@ public class TestNameNodeMetrics {
         cluster.getNameNode(), file.toString(), 0, 1).get(0);
     cluster.getNamesystem().writeLock();
     try {
-      bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0]);
+      bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
+          "TEST");
     } finally {
       cluster.getNamesystem().writeUnlock();
     }

+ 4 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -14,6 +14,8 @@ Trunk (unreleased changes)
     (Plamen Jeliazkov via shv)
 
   IMPROVEMENTS
+    MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for 
+                    faster job submission. (amarrk)
 
     MAPREDUCE-3481. [Gridmix] Improve Gridmix STRESS mode. (amarrk)
 
@@ -132,6 +134,8 @@ Release 0.23.2 - UNRELEASED
 
     MAPREDUCE-3798. Fixed failing TestJobCleanup.testCusomCleanup() and moved it
     to the maven build. (Ravi Prakash via vinodkv)
+
+    MAPREDUCE-3884. PWD should be first in the classpath of MR tasks (tucu)
  
 Release 0.23.1 - 2012-02-17 
 

+ 3 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java

@@ -230,6 +230,9 @@ public class MRApps extends Apps {
     boolean userClassesTakesPrecedence = 
       conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
 
+    Apps.addToEnvironment(environment,
+      Environment.CLASSPATH.name(),
+      Environment.PWD.$());
     if (!userClassesTakesPrecedence) {
       MRApps.setMRFrameworkClasspath(environment, conf);
     }

+ 3 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java

@@ -130,7 +130,7 @@ public class TestMRApps {
     Job job = Job.getInstance();
     Map<String, String> environment = new HashMap<String, String>();
     MRApps.setClasspath(environment, job.getConfiguration());
-    assertEquals("$HADOOP_CONF_DIR:" +
+    assertEquals("$PWD:$HADOOP_CONF_DIR:" +
         "$HADOOP_COMMON_HOME/share/hadoop/common/*:" +
         "$HADOOP_COMMON_HOME/share/hadoop/common/lib/*:" +
         "$HADOOP_HDFS_HOME/share/hadoop/hdfs/*:" +
@@ -152,7 +152,7 @@ public class TestMRApps {
     }
     String env_str = env.get("CLASSPATH");
     assertSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
-      env_str.indexOf("job.jar"), 0);
+      env_str.indexOf("$PWD:job.jar"), 0);
   }
 
   @Test public void testSetClasspathWithNoUserPrecendence() {
@@ -166,7 +166,7 @@ public class TestMRApps {
     }
     String env_str = env.get("CLASSPATH");
     assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
-      env_str.indexOf("job.jar"), 0);
+      env_str.indexOf("$PWD:job.jar"), 0);
   }
 
 }

+ 26 - 18
hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ExecutionSummarizer.java

@@ -53,6 +53,7 @@ class ExecutionSummarizer implements StatListener<JobStats> {
   private int numJobsInInputTrace;
   private int totalSuccessfulJobs;
   private int totalFailedJobs;
+  private int totalLostJobs;
   private int totalMapTasksLaunched;
   private int totalReduceTasksLaunched;
   private long totalSimulationTime;
@@ -90,31 +91,32 @@ class ExecutionSummarizer implements StatListener<JobStats> {
     simulationStartTime = System.currentTimeMillis();
   }
   
-  private void processJobState(JobStats stats) throws Exception {
+  private void processJobState(JobStats stats) {
     Job job = stats.getJob();
-    if (job.isSuccessful()) {
-      ++totalSuccessfulJobs;
-    } else {
-      ++totalFailedJobs;
+    try {
+      if (job.isSuccessful()) {
+        ++totalSuccessfulJobs;
+      } else {
+        ++totalFailedJobs;
+      }
+    } catch (Exception e) {
+      // this behavior is consistent with job-monitor which marks the job as 
+      // complete (lost) if the status polling bails out
+      ++totalLostJobs; 
     }
   }
   
-  private void processJobTasks(JobStats stats) throws Exception {
+  private void processJobTasks(JobStats stats) {
     totalMapTasksLaunched += stats.getNoOfMaps();
-    Job job = stats.getJob();
-    totalReduceTasksLaunched += job.getNumReduceTasks();
+    totalReduceTasksLaunched += stats.getNoOfReds();
   }
   
   private void process(JobStats stats) {
-    try {
-      // process the job run state
-      processJobState(stats);
-      
-      // process the tasks information
-      processJobTasks(stats);
-    } catch (Exception e) {
-      LOG.info("Error in processing job " + stats.getJob().getJobID() + ".");
-    }
+    // process the job run state
+    processJobState(stats);
+
+    // process the tasks information
+    processJobTasks(stats);
   }
   
   @Override
@@ -191,6 +193,8 @@ class ExecutionSummarizer implements StatListener<JobStats> {
            .append(getNumSuccessfulJobs());
     builder.append("\nTotal number of failed jobs: ")
            .append(getNumFailedJobs());
+    builder.append("\nTotal number of lost jobs: ")
+           .append(getNumLostJobs());
     builder.append("\nTotal number of map tasks launched: ")
            .append(getNumMapTasksLaunched());
     builder.append("\nTotal number of reduce task launched: ")
@@ -266,8 +270,12 @@ class ExecutionSummarizer implements StatListener<JobStats> {
     return totalFailedJobs;
   }
   
+  protected int getNumLostJobs() {
+    return totalLostJobs;
+  }
+  
   protected int getNumSubmittedJobs() {
-    return totalSuccessfulJobs + totalFailedJobs;
+    return totalSuccessfulJobs + totalFailedJobs + totalLostJobs;
   }
   
   protected int getNumMapTasksLaunched() {

+ 43 - 6
hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.gridmix.GenerateData.DataStatistics;
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -93,6 +94,31 @@ public class Gridmix extends Configured implements Tool {
    */
   public static final String GRIDMIX_USR_RSV = "gridmix.user.resolve.class";
 
+  /**
+   * The configuration key which determines the duration for which the 
+   * job-monitor sleeps while polling for job status.
+   * This value should be specified in milliseconds.
+   */
+  public static final String GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS = 
+    "gridmix.job-monitor.sleep-time-ms";
+  
+  /**
+   * Default value for {@link #GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS}.
+   */
+  public static final int GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS_DEFAULT = 500;
+  
+  /**
+   * The configuration key which determines the total number of job-status
+   * monitoring threads.
+   */
+  public static final String GRIDMIX_JOBMONITOR_THREADS = 
+    "gridmix.job-monitor.thread-count";
+  
+  /**
+   * Default value for {@link #GRIDMIX_JOBMONITOR_THREADS}.
+   */
+  public static final int GRIDMIX_JOBMONITOR_THREADS_DEFAULT = 1;
+  
   /**
    * Configuration property set in simulated job's configuration whose value is
    * set to the corresponding original job's name. This is not configurable by
@@ -185,8 +211,13 @@ public class Gridmix extends Configured implements Tool {
     submitter.add(job);
 
     // TODO add listeners, use for job dependencies
-    TimeUnit.SECONDS.sleep(10);
     try {
+      while (!job.isSubmitted()) {
+        try {
+            Thread.sleep(100); // sleep
+          } catch (InterruptedException ie) {}
+      }
+      // wait for completion
       job.getJob().waitForCompletion(false);
     } catch (ClassNotFoundException e) {
       throw new IOException("Internal error", e);
@@ -241,7 +272,7 @@ public class Gridmix extends Configured implements Tool {
       GridmixJobSubmissionPolicy policy = getJobSubmissionPolicy(conf);
       LOG.info(" Submission policy is " + policy.name());
       statistics = new Statistics(conf, policy.getPollingInterval(), startFlag);
-      monitor = createJobMonitor(statistics);
+      monitor = createJobMonitor(statistics, conf);
       int noOfSubmitterThreads = 
         (policy == GridmixJobSubmissionPolicy.SERIAL) 
         ? 1
@@ -276,8 +307,13 @@ public class Gridmix extends Configured implements Tool {
     }
    }
 
-  protected JobMonitor createJobMonitor(Statistics stats) throws IOException {
-    return new JobMonitor(stats);
+  protected JobMonitor createJobMonitor(Statistics stats, Configuration conf) 
+  throws IOException {
+    int delay = conf.getInt(GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS, 
+                            GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS_DEFAULT);
+    int numThreads = conf.getInt(GRIDMIX_JOBMONITOR_THREADS, 
+                                 GRIDMIX_JOBMONITOR_THREADS_DEFAULT);
+    return new JobMonitor(delay, TimeUnit.MILLISECONDS, stats, numThreads);
   }
 
   protected JobSubmitter createJobSubmitter(JobMonitor monitor, int threads,
@@ -571,12 +607,13 @@ public class Gridmix extends Configured implements Tool {
         if (monitor == null) {
           return;
         }
-        List<Job> remainingJobs = monitor.getRemainingJobs();
+        List<JobStats> remainingJobs = monitor.getRemainingJobs();
         if (remainingJobs.isEmpty()) {
           return;
         }
         LOG.info("Killing running jobs...");
-        for (Job job : remainingJobs) {
+        for (JobStats stats : remainingJobs) {
+          Job job = stats.getJob();
           try {
             if (!job.isComplete()) {
               job.killJob();

+ 9 - 0
hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java

@@ -72,6 +72,7 @@ abstract class GridmixJob implements Callable<Job>, Delayed {
       }
     };
 
+  private boolean submitted;
   protected final int seq;
   protected final Path outdir;
   protected final Job job;
@@ -412,6 +413,14 @@ abstract class GridmixJob implements Callable<Job>, Delayed {
     return jobdesc;
   }
 
+  void setSubmitted() {
+    submitted = true;
+  }
+  
+  boolean isSubmitted() {
+    return submitted;
+  }
+  
   static void pushDescription(int seq, List<InputSplit> splits) {
     if (null != descCache.putIfAbsent(seq, splits)) {
       throw new IllegalArgumentException("Description exists for id " + seq);

+ 22 - 5
hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobFactory.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred.gridmix;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,6 +36,8 @@ import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.List;
+import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -179,19 +182,33 @@ abstract class JobFactory<T> implements Gridmix.Component<Void>,StatListener<T>
   
   protected JobStory getNextJobFiltered() throws IOException {
     JobStory job = getNextJobFromTrace();
+    // filter out the following jobs
+    //    - unsuccessful jobs
+    //    - jobs with missing submit-time
+    //    - reduce only jobs
+    // These jobs are not yet supported in Gridmix
     while (job != null &&
       (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS ||
-        job.getSubmissionTime() < 0)) {
+        job.getSubmissionTime() < 0 || job.getNumberMaps() == 0)) {
       if (LOG.isDebugEnabled()) {
-        String reason = null;
+        List<String> reason = new ArrayList<String>();
         if (job.getOutcome() != Pre21JobHistoryConstants.Values.SUCCESS) {
-          reason = "STATE (" + job.getOutcome().name() + ") ";
+          reason.add("STATE (" + job.getOutcome().name() + ")");
         }
         if (job.getSubmissionTime() < 0) {
-          reason += "SUBMISSION-TIME (" + job.getSubmissionTime() + ")";
+          reason.add("SUBMISSION-TIME (" + job.getSubmissionTime() + ")");
         }
+        if (job.getNumberMaps() == 0) {
+          reason.add("ZERO-MAPS-JOB");
+        }
+        
+        // TODO This should never happen. Probably we missed something!
+        if (reason.size() == 0) {
+          reason.add("N/A");
+        }
+        
         LOG.debug("Ignoring job " + job.getJobID() + " from the input trace."
-                  + " Reason: " + reason == null ? "N/A" : reason);
+                  + " Reason: " + StringUtils.join(reason, ","));
       }
       job = getNextJobFromTrace();
     }

+ 115 - 70
hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobMonitor.java

@@ -24,37 +24,47 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
 
 /**
- * Component accepting submitted, running jobs and responsible for
- * monitoring jobs for success and failure. Once a job is submitted, it is
- * polled for status until complete. If a job is complete, then the monitor
- * thread returns immediately to the queue. If not, the monitor will sleep
- * for some duration.
+ * Component accepting submitted, running {@link Statistics.JobStats} and 
+ * responsible for monitoring jobs for success and failure. Once a job is 
+ * submitted, it is polled for status until complete. If a job is complete, 
+ * then the monitor thread returns immediately to the queue. If not, the monitor
+ * will sleep for some duration.
+ * 
+ * {@link JobMonitor} can be configured to use multiple threads for polling
+ * the job statuses. Use {@link Gridmix#GRIDMIX_JOBMONITOR_THREADS} to specify
+ * the total number of monitoring threads. 
+ * 
+ * The duration for which a monitoring thread sleeps if the first job in the 
+ * queue is running can also be configured. Use 
+ * {@link Gridmix#GRIDMIX_JOBMONITOR_SLEEPTIME_MILLIS} to specify a custom 
+ * value.
  */
-class JobMonitor implements Gridmix.Component<Job> {
+class JobMonitor implements Gridmix.Component<JobStats> {
 
   public static final Log LOG = LogFactory.getLog(JobMonitor.class);
 
-  private final Queue<Job> mJobs;
-  private final MonitorThread mThread;
-  private final BlockingQueue<Job> runningJobs;
+  private final Queue<JobStats> mJobs;
+  private ExecutorService executor;
+  private int numPollingThreads;
+  private final BlockingQueue<JobStats> runningJobs;
   private final long pollDelayMillis;
   private Statistics statistics;
   private boolean graceful = false;
   private boolean shutdown = false;
 
-  public JobMonitor(Statistics statistics) {
-    this(5,TimeUnit.SECONDS, statistics);
-  }
-
   /**
    * Create a JobMonitor that sleeps for the specified duration after
    * polling a still-running job.
@@ -62,30 +72,37 @@ class JobMonitor implements Gridmix.Component<Job> {
    * @param unit Time unit for pollDelaySec (rounded to milliseconds)
    * @param statistics StatCollector , listener to job completion.
    */
-  public JobMonitor(int pollDelay, TimeUnit unit, Statistics statistics) {
-    mThread = new MonitorThread();
-    runningJobs = new LinkedBlockingQueue<Job>();
-    mJobs = new LinkedList<Job>();
+  public JobMonitor(int pollDelay, TimeUnit unit, Statistics statistics, 
+                    int numPollingThreads) {
+    executor = Executors.newCachedThreadPool();
+    this.numPollingThreads = numPollingThreads;
+    runningJobs = new LinkedBlockingQueue<JobStats>();
+    mJobs = new LinkedList<JobStats>();
     this.pollDelayMillis = TimeUnit.MILLISECONDS.convert(pollDelay, unit);
     this.statistics = statistics;
   }
 
   /**
-   * Add a job to the polling queue.
+   * Add a running job's status to the polling queue.
    */
-  public void add(Job job) throws InterruptedException {
-    runningJobs.put(job);
+  public void add(JobStats job) throws InterruptedException {
+    synchronized (runningJobs) {
+      runningJobs.put(job);
+    }
   }
 
   /**
-   * Add a submission failed job , such that it can be communicated
+   * Add a submission failed job's status, such that it can be communicated
    * back to serial.
    * TODO: Cleaner solution for this problem
    * @param job
    */
-  public void submissionFailed(Job job) {
-    LOG.info("Job submission failed notification for job " + job.getJobID());
-    this.statistics.add(job);
+  public void submissionFailed(JobStats job) {
+    String jobID = job.getJob().getConfiguration().get(Gridmix.ORIGINAL_JOB_ID);
+    LOG.info("Job submission failed notification for job " + jobID);
+    synchronized (statistics) {
+      this.statistics.add(job);
+    }
   }
 
   /**
@@ -108,12 +125,9 @@ class JobMonitor implements Gridmix.Component<Job> {
    * @throws IllegalStateException If monitoring thread is still running.
    * @return Any jobs submitted and not known to have completed.
    */
-  List<Job> getRemainingJobs() {
-    if (mThread.isAlive()) {
-      LOG.warn("Internal error: Polling running monitor for jobs");
-    }
+  List<JobStats> getRemainingJobs() {
     synchronized (mJobs) {
-      return new ArrayList<Job>(mJobs);
+      return new ArrayList<JobStats>(mJobs);
     }
   }
 
@@ -123,19 +137,8 @@ class JobMonitor implements Gridmix.Component<Job> {
    */
   private class MonitorThread extends Thread {
 
-    public MonitorThread() {
-      super("GridmixJobMonitor");
-    }
-
-    /**
-     * Check a job for success or failure.
-     */
-    public void process(Job job) throws IOException, InterruptedException {
-      if (job.isSuccessful()) {
-        onSuccess(job);
-      } else {
-        onFailure(job);
-      }
+    public MonitorThread(int i) {
+      super("GridmixJobMonitor-" + i);
     }
 
     @Override
@@ -144,10 +147,12 @@ class JobMonitor implements Gridmix.Component<Job> {
       boolean shutdown;
       while (true) {
         try {
-          synchronized (mJobs) {
-            graceful = JobMonitor.this.graceful;
-            shutdown = JobMonitor.this.shutdown;
-            runningJobs.drainTo(mJobs);
+          synchronized (runningJobs) {
+            synchronized (mJobs) {
+              graceful = JobMonitor.this.graceful;
+              shutdown = JobMonitor.this.shutdown;
+              runningJobs.drainTo(mJobs);
+            }
           }
 
           // shutdown conditions; either shutdown requested and all jobs
@@ -155,26 +160,63 @@ class JobMonitor implements Gridmix.Component<Job> {
           // submitted jobs not in the monitored set
           if (shutdown) {
             if (!graceful) {
-              while (!runningJobs.isEmpty()) {
-                synchronized (mJobs) {
-                  runningJobs.drainTo(mJobs);
+              synchronized (runningJobs) {
+                while (!runningJobs.isEmpty()) {
+                  synchronized (mJobs) {
+                    runningJobs.drainTo(mJobs);
+                  }
                 }
               }
               break;
-            } else if (mJobs.isEmpty()) {
-              break;
             }
-          }
-          while (!mJobs.isEmpty()) {
-            Job job;
+            
             synchronized (mJobs) {
-              job = mJobs.poll();
+              if (graceful && mJobs.isEmpty()) {
+                break;
+              }
             }
+          }
+          JobStats jobStats = null;
+          synchronized (mJobs) {
+            jobStats = mJobs.poll();
+          }
+          while (jobStats != null) {
+            Job job = jobStats.getJob();
+            
             try {
-              if (job.isComplete()) {
-                process(job);
-                statistics.add(job);
-                continue;
+              // get the job status
+              long start = System.currentTimeMillis();
+              JobStatus status = job.getStatus(); // cache the job status
+              long end = System.currentTimeMillis();
+              
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Status polling for job " + job.getJobID() + " took "
+                          + (end-start) + "ms.");
+              }
+              
+              // update the job progress
+              jobStats.updateJobStatus(status);
+              
+              // if the job is complete, let others know
+              if (status.isJobComplete()) {
+                if (status.getState() == JobStatus.State.SUCCEEDED) {
+                  onSuccess(job);
+                } else {
+                  onFailure(job);
+                }
+                synchronized (statistics) {
+                  statistics.add(jobStats);
+                }
+              } else {
+                // add the running job back and break
+                synchronized (mJobs) {
+                  if (!mJobs.offer(jobStats)) {
+                    LOG.error("Lost job " + (null == job.getJobName()
+                         ? "<unknown>" : job.getJobName())); // should never
+                                                             // happen
+                  }
+                }
+                break;
               }
             } catch (IOException e) {
               if (e.getCause() instanceof ClosedByInterruptException) {
@@ -186,18 +228,19 @@ class JobMonitor implements Gridmix.Component<Job> {
               } else {
                 LOG.warn("Lost job " + (null == job.getJobName()
                      ? "<unknown>" : job.getJobName()), e);
-                continue;
+                synchronized (statistics) {
+                  statistics.add(jobStats);
+                }
               }
             }
+            
+            // get the next job
             synchronized (mJobs) {
-              if (!mJobs.offer(job)) {
-                LOG.error("Lost job " + (null == job.getJobName()
-                     ? "<unknown>" : job.getJobName())); // should never
-                                                         // happen
-              }
+              jobStats = mJobs.poll();
             }
-            break;
           }
+          
+          // sleep for a while before checking again
           try {
             TimeUnit.MILLISECONDS.sleep(pollDelayMillis);
           } catch (InterruptedException e) {
@@ -215,7 +258,9 @@ class JobMonitor implements Gridmix.Component<Job> {
    * Start the internal, monitoring thread.
    */
   public void start() {
-    mThread.start();
+    for (int i = 0; i < numPollingThreads; ++i) {
+      executor.execute(new MonitorThread(i));
+    }
   }
 
   /**
@@ -224,7 +269,7 @@ class JobMonitor implements Gridmix.Component<Job> {
    * if no form of shutdown has been requested.
    */
   public void join(long millis) throws InterruptedException {
-    mThread.join(millis);
+    executor.awaitTermination(millis, TimeUnit.MILLISECONDS);
   }
 
   /**
@@ -236,7 +281,7 @@ class JobMonitor implements Gridmix.Component<Job> {
       graceful = false;
       shutdown = true;
     }
-    mThread.interrupt();
+    executor.shutdown();
   }
 
   /**
@@ -248,7 +293,7 @@ class JobMonitor implements Gridmix.Component<Job> {
       graceful = true;
       shutdown = true;
     }
-    mThread.interrupt();
+    executor.shutdown();
   }
 }
 

+ 39 - 10
hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java

@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
 
 /**
  * Component accepting deserialized job traces, computing split data, and
@@ -46,6 +47,7 @@ class JobSubmitter implements Gridmix.Component<GridmixJob> {
   private final JobMonitor monitor;
   private final ExecutorService sched;
   private volatile boolean shutdown = false;
+  private final int queueDepth;
 
   /**
    * Initialize the submission component with downstream monitor and pool of
@@ -61,6 +63,7 @@ class JobSubmitter implements Gridmix.Component<GridmixJob> {
    */
   public JobSubmitter(JobMonitor monitor, int threads, int queueDepth,
       FilePool inputDir, Statistics statistics) {
+    this.queueDepth = queueDepth;
     sem = new Semaphore(queueDepth);
     sched = new ThreadPoolExecutor(threads, threads, 0L,
         TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
@@ -79,19 +82,25 @@ class JobSubmitter implements Gridmix.Component<GridmixJob> {
       this.job = job;
     }
     public void run() {
+      JobStats stats = 
+        Statistics.generateJobStats(job.getJob(), job.getJobDesc());
       try {
         // pre-compute split information
         try {
+          long start = System.currentTimeMillis();
           job.buildSplits(inputDir);
+          long end = System.currentTimeMillis();
+          LOG.info("[JobSubmitter] Time taken to build splits for job " 
+                   + job.getJob().getJobID() + ": " + (end - start) + " ms.");
         } catch (IOException e) {
           LOG.warn("Failed to submit " + job.getJob().getJobName() + " as " 
                    + job.getUgi(), e);
-          monitor.submissionFailed(job.getJob());
+          monitor.submissionFailed(stats);
           return;
         } catch (Exception e) {
           LOG.warn("Failed to submit " + job.getJob().getJobName() + " as " 
                    + job.getUgi(), e);
-          monitor.submissionFailed(job.getJob());
+          monitor.submissionFailed(stats);
           return;
         }
         // Sleep until deadline
@@ -102,10 +111,28 @@ class JobSubmitter implements Gridmix.Component<GridmixJob> {
         }
         try {
           // submit job
-          monitor.add(job.call());
-          statistics.addJobStats(job.getJob(), job.getJobDesc());
-          LOG.debug("SUBMIT " + job + "@" + System.currentTimeMillis() +
-              " (" + job.getJob().getJobID() + ")");
+          long start = System.currentTimeMillis();
+          job.call();
+          long end = System.currentTimeMillis();
+          LOG.info("[JobSubmitter] Time taken to submit the job " 
+                   + job.getJob().getJobID() + ": " + (end - start) + " ms.");
+          
+          // mark it as submitted
+          job.setSubmitted();
+          
+          // add to the monitor
+          monitor.add(stats);
+          
+          // add to the statistics
+          statistics.addJobStats(stats);
+          if (LOG.isDebugEnabled()) {
+            String jobID = 
+              job.getJob().getConfiguration().get(Gridmix.ORIGINAL_JOB_ID);
+            LOG.debug("Original job '" + jobID + "' is being simulated as '" 
+                      + job.getJob().getJobID() + "'");
+            LOG.debug("SUBMIT " + job + "@" + System.currentTimeMillis() 
+                      + " (" + job.getJob().getJobID() + ")");
+          }
         } catch (IOException e) {
           LOG.warn("Failed to submit " + job.getJob().getJobName() + " as " 
                    + job.getUgi(), e);
@@ -113,21 +140,21 @@ class JobSubmitter implements Gridmix.Component<GridmixJob> {
             throw new InterruptedException("Failed to submit " +
                 job.getJob().getJobName());
           }
-          monitor.submissionFailed(job.getJob());
+          monitor.submissionFailed(stats);
         } catch (ClassNotFoundException e) {
           LOG.warn("Failed to submit " + job.getJob().getJobName(), e);
-          monitor.submissionFailed(job.getJob());
+          monitor.submissionFailed(stats);
         }
       } catch (InterruptedException e) {
         // abort execution, remove splits if nesc
         // TODO release ThdLoc
         GridmixJob.pullDescription(job.id());
         Thread.currentThread().interrupt();
-        monitor.submissionFailed(job.getJob());
+        monitor.submissionFailed(stats);
       } catch(Exception e) {
         //Due to some exception job wasnt submitted.
         LOG.info(" Job " + job.getJob().getJobID() + " submission failed " , e);
-        monitor.submissionFailed(job.getJob());
+        monitor.submissionFailed(stats);
       } finally {
         sem.release();
       }
@@ -141,6 +168,8 @@ class JobSubmitter implements Gridmix.Component<GridmixJob> {
     final boolean addToQueue = !shutdown;
     if (addToQueue) {
       final SubmitTask task = new SubmitTask(job);
+      LOG.info("Total number of queued jobs: " 
+               + (queueDepth - sem.availablePermits()));
       sem.acquire();
       try {
         sched.execute(task);

+ 92 - 33
hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.gridmix.Gridmix.Component;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.rumen.JobStory;
 
@@ -43,12 +44,12 @@ import java.util.concurrent.locks.ReentrantLock;
 /**
  * Component collecting the stats required by other components
  * to make decisions.
- * Single thread Collector tries to collec the stats.
- * Each of thread poll updates certain datastructure(Currently ClusterStats).
- * Components interested in these datastructure, need to register.
- * StatsCollector notifies each of the listeners.
+ * Single thread collector tries to collect the stats (currently cluster stats)
+ * and caches it internally.
+ * Components interested in these stats need to register themselves and will get
+ * notified either on every job completion event or some fixed time interval.
  */
-public class Statistics implements Component<Job> {
+public class Statistics implements Component<Statistics.JobStats> {
   public static final Log LOG = LogFactory.getLog(Statistics.class);
 
   private final StatCollector statistics = new StatCollector();
@@ -62,10 +63,16 @@ public class Statistics implements Component<Job> {
   private final List<StatListener<JobStats>> jobStatListeners =
     new CopyOnWriteArrayList<StatListener<JobStats>>();
 
-  //List of jobids and noofMaps for each job
-  private static final Map<Integer, JobStats> jobMaps =
-    new ConcurrentHashMap<Integer,JobStats>();
+  // A map of job-sequence-id to job-stats of submitted jobs
+  private static final Map<Integer, JobStats> submittedJobsMap =
+    new ConcurrentHashMap<Integer, JobStats>();
+  
+  // total number of map tasks submitted
+  private static volatile int numMapsSubmitted = 0;
 
+  // total number of reduce tasks submitted
+  private static volatile int numReducesSubmitted = 0;
+  
   private int completedJobsInCurrentInterval = 0;
   private final int jtPollingInterval;
   private volatile boolean shutdown = false;
@@ -92,41 +99,65 @@ public class Statistics implements Component<Job> {
     this.startFlag = startFlag;
   }
 
-  public void addJobStats(Job job, JobStory jobdesc) {
+  /**
+   * Generates a job stats.
+   */
+  public static JobStats generateJobStats(Job job, JobStory jobdesc) {
     int seq = GridmixJob.getJobSeqId(job);
-    if (seq < 0) {
-      LOG.info("Not tracking job " + job.getJobName()
-               + " as seq id is less than zero: " + seq);
-      return;
+    // bail out if job description is missing for a job to be simulated
+    if (seq >= 0 && jobdesc == null) {
+      throw new IllegalArgumentException("JobStory not available for job " 
+                                         + job.getJobID());
     }
     
-    int maps = 0;
-    int reds = 0;
-    if (jobdesc == null) {
-      throw new IllegalArgumentException(
-        " JobStory not available for job " + job.getJobName());
-    } else {
+    int maps = -1;
+    int reds = -1;
+    if (jobdesc != null) {
+      // Note that the ZombieJob will return a >= 0 value
       maps = jobdesc.getNumberMaps();
       reds = jobdesc.getNumberReduces();
     }
-    JobStats stats = new JobStats(maps, reds, job);
-    jobMaps.put(seq,stats);
+    return new JobStats(maps, reds, job);
+  }
+  
+  /**
+   * Add a submitted job for monitoring.
+   */
+  public void addJobStats(JobStats stats) {
+    int seq = GridmixJob.getJobSeqId(stats.getJob());
+    if (seq < 0) {
+      LOG.info("Not tracking job " + stats.getJob().getJobName()
+               + " as seq id is less than zero: " + seq);
+      return;
+    }
+    submittedJobsMap.put(seq, stats);
+    numMapsSubmitted += stats.getNoOfMaps();
+    numReducesSubmitted += stats.getNoOfReds();
   }
 
   /**
    * Used by JobMonitor to add the completed job.
    */
   @Override
-  public void add(Job job) {
-    //This thread will be notified initially by jobmonitor incase of
+  public void add(Statistics.JobStats job) {
+    //This thread will be notified initially by job-monitor incase of
     //data generation. Ignore that as we are getting once the input is
     //generated.
     if (!statistics.isAlive()) {
       return;
     }
-    JobStats stat = jobMaps.remove(GridmixJob.getJobSeqId(job));
-
-    if (stat == null) return;
+    JobStats stat = submittedJobsMap.remove(GridmixJob.getJobSeqId(job.getJob()));
+    
+    // stat cannot be null
+    if (stat == null) {
+      LOG.error("[Statistics] Missing entry for job " 
+                + job.getJob().getJobID());
+      return;
+    }
+    
+    // update the total number of submitted map/reduce task count
+    numMapsSubmitted -= stat.getNoOfMaps();
+    numReducesSubmitted -= stat.getNoOfReds();
     
     completedJobsInCurrentInterval++;
     //check if we have reached the maximum level of job completions.
@@ -238,7 +269,7 @@ public class Statistics implements Component<Job> {
   @Override
   public void shutdown() {
     shutdown = true;
-    jobMaps.clear();
+    submittedJobsMap.clear();
     clusterStatlisteners.clear();
     jobStatListeners.clear();
     statistics.interrupt();
@@ -247,7 +278,7 @@ public class Statistics implements Component<Job> {
   @Override
   public void abort() {
     shutdown = true;
-    jobMaps.clear();
+    submittedJobsMap.clear();
     clusterStatlisteners.clear();
     jobStatListeners.clear();
     statistics.interrupt();
@@ -259,9 +290,10 @@ public class Statistics implements Component<Job> {
    * TODO: In future we need to extend this to send more information.
    */
   static class JobStats {
-    private int noOfMaps;
-    private int noOfReds;
-    private Job job;
+    private final int noOfMaps;
+    private final int noOfReds;
+    private JobStatus currentStatus;
+    private final Job job;
 
     public JobStats(int noOfMaps,int numOfReds, Job job){
       this.job = job;
@@ -284,6 +316,20 @@ public class Statistics implements Component<Job> {
     public Job getJob() {
       return job;
     }
+    
+    /**
+     * Update the job statistics.
+     */
+    public synchronized void updateJobStatus(JobStatus status) {
+      this.currentStatus = status;
+    }
+    
+    /**
+     * Get the current job status.
+     */
+    public synchronized JobStatus getJobStatus() {
+      return currentStatus;
+    }
   }
 
   static class ClusterStats {
@@ -316,15 +362,28 @@ public class Statistics implements Component<Job> {
     }
 
     int getNumRunningJob() {
-      return jobMaps.size();
+      return submittedJobsMap.size();
     }
 
     /**
      * @return runningWatitingJobs
      */
     static Collection<JobStats> getRunningJobStats() {
-      return jobMaps.values();
+      return submittedJobsMap.values();
     }
 
+    /**
+     * Returns the total number of submitted map tasks
+     */
+    static int getSubmittedMapTasks() {
+      return numMapsSubmitted;
+    }
+    
+    /**
+     * Returns the total number of submitted reduce tasks
+     */
+    static int getSubmittedReduceTasks() {
+      return numReducesSubmitted;
+    }
   }
 }

+ 196 - 56
hadoop-mapreduce-project/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java

@@ -25,11 +25,15 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
 import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.rumen.JobStory;
 import org.apache.hadoop.tools.rumen.JobStoryProducer;
 
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -87,6 +91,13 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
     "gridmix.throttle.jobs-to-tracker-ratio";  
   final float maxJobTrackerRatio;
 
+  /**
+   * Represents a list of blacklisted jobs. Jobs are blacklisted when either 
+   * they are complete or their status cannot be obtained. Stress mode will 
+   * ignore blacklisted jobs from its overload computation.
+   */
+  private Set<JobID> blacklistedJobs = new HashSet<JobID>();
+  
   /**
    * Creating a new instance does not start the thread.
    *
@@ -145,42 +156,66 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
       try {
         startFlag.await();
         if (Thread.currentThread().isInterrupted()) {
+          LOG.warn("[STRESS] Interrupted before start!. Exiting..");
           return;
         }
         LOG.info("START STRESS @ " + System.currentTimeMillis());
         while (!Thread.currentThread().isInterrupted()) {
           try {
             while (loadStatus.overloaded()) {
+              // update the overload status
               if (LOG.isDebugEnabled()) {
-                LOG.debug("Cluster overloaded in run! Sleeping...");
+                LOG.debug("Updating the overload status.");
               }
-              // sleep 
               try {
-                Thread.sleep(1000);
-              } catch (InterruptedException ie) {
+                checkLoadAndGetSlotsToBackfill();
+              } catch (IOException ioe) {
+                LOG.warn("[STRESS] Check failed!", ioe);
                 return;
               }
+              
+              // if the cluster is still overloaded, then sleep
+              if (loadStatus.overloaded()) {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("[STRESS] Cluster overloaded in run! Sleeping...");
+                }
+
+                // sleep 
+                try {
+                  Thread.sleep(1000);
+                } catch (InterruptedException ie) {
+                  LOG.warn("[STRESS] Interrupted while sleeping! Exiting.", ie);
+                  return;
+                }
+              }
             }
 
             while (!loadStatus.overloaded()) {
               if (LOG.isDebugEnabled()) {
-                LOG.debug("Cluster underloaded in run! Stressing...");
+                LOG.debug("[STRESS] Cluster underloaded in run! Stressing...");
               }
               try {
                 //TODO This in-line read can block submission for large jobs.
                 final JobStory job = getNextJobFiltered();
                 if (null == job) {
+                  LOG.warn("[STRESS] Finished consuming the input trace. " 
+                           + "Exiting..");
                   return;
                 }
                 if (LOG.isDebugEnabled()) {
                   LOG.debug("Job Selected: " + job.getJobID());
                 }
-                submitter.add(
-                  jobCreator.createGridmixJob(
-                    conf, 0L, job, scratch, 
-                    userResolver.getTargetUgi(
-                      UserGroupInformation.createRemoteUser(job.getUser())), 
-                    sequence.getAndIncrement()));
+                
+                UserGroupInformation ugi = 
+                  UserGroupInformation.createRemoteUser(job.getUser());
+                UserGroupInformation tgtUgi = userResolver.getTargetUgi(ugi);
+                GridmixJob tJob = 
+                  jobCreator.createGridmixJob(conf, 0L, job, scratch, 
+                               tgtUgi, sequence.getAndIncrement());
+                
+                // submit the job
+                submitter.add(tJob);
+                
                 // TODO: We need to take care of scenario when one map/reduce
                 // takes more than 1 slot.
                 
@@ -198,7 +233,7 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
                   
                 loadStatus.decrementJobLoad(1);
               } catch (IOException e) {
-                LOG.error("Error while submitting the job ", e);
+                LOG.error("[STRESS] Error while submitting the job ", e);
                 error = e;
                 return;
               }
@@ -209,6 +244,7 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
           }
         }
       } catch (InterruptedException e) {
+        LOG.error("[STRESS] Interrupted in the main block!", e);
         return;
       } finally {
         IOUtils.cleanup(null, jobProducer);
@@ -224,9 +260,17 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
    */
   @Override
   public void update(Statistics.ClusterStats item) {
-    ClusterStatus clusterMetrics = item.getStatus();
+    ClusterStatus clusterStatus = item.getStatus();
     try {
-      checkLoadAndGetSlotsToBackfill(item, clusterMetrics);
+      // update the max cluster map/reduce task capacity
+      loadStatus.updateMapCapacity(clusterStatus.getMaxMapTasks());
+      
+      loadStatus.updateReduceCapacity(clusterStatus.getMaxReduceTasks());
+      
+      int numTrackers = clusterStatus.getTaskTrackers();
+      int jobLoad = 
+        (int) (maxJobTrackerRatio * numTrackers) - item.getNumRunningJob();
+      loadStatus.updateJobLoad(jobLoad);
     } catch (Exception e) {
       LOG.error("Couldn't get the new Status",e);
     }
@@ -258,22 +302,8 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
    * @param clusterStatus Cluster status
    * @throws java.io.IOException
    */
-  private void checkLoadAndGetSlotsToBackfill(
-    ClusterStats stats, ClusterStatus clusterStatus) throws IOException, InterruptedException {
-    
-    // update the max cluster capacity incase its updated
-    int mapCapacity = clusterStatus.getMaxMapTasks();
-    loadStatus.updateMapCapacity(mapCapacity);
-    
-    int reduceCapacity = clusterStatus.getMaxReduceTasks();
-    
-    loadStatus.updateReduceCapacity(reduceCapacity);
-    
-    int numTrackers = clusterStatus.getTaskTrackers();
-    
-    int jobLoad = 
-      (int) (maxJobTrackerRatio * numTrackers) - stats.getNumRunningJob();
-    loadStatus.updateJobLoad(jobLoad);
+  protected void checkLoadAndGetSlotsToBackfill() 
+  throws IOException, InterruptedException {
     if (loadStatus.getJobLoad() <= 0) {
       if (LOG.isDebugEnabled()) {
         LOG.debug(System.currentTimeMillis() + " [JobLoad] Overloaded is "
@@ -283,17 +313,143 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
       return; // stop calculation because we know it is overloaded.
     }
 
-    float incompleteMapTasks = 0; // include pending & running map tasks.
-    for (JobStats job : ClusterStats.getRunningJobStats()) {
-      float mapProgress = job.getJob().mapProgress();
-      int noOfMaps = job.getNoOfMaps();
-      incompleteMapTasks += 
-        calcEffectiveIncompleteMapTasks(mapCapacity, noOfMaps, mapProgress);
+    int mapCapacity = loadStatus.getMapCapacity();
+    int reduceCapacity = loadStatus.getReduceCapacity();
+    
+    // return if the cluster status is not set
+    if (mapCapacity < 0 || reduceCapacity < 0) {
+      // note that, by default, the overload status is true
+      // missing cluster status will result into blocking of job submission
+      return;
     }
     
-    int mapSlotsBackFill = 
-      (int) ((overloadMapTaskMapSlotRatio * mapCapacity) - incompleteMapTasks);
-    loadStatus.updateMapLoad(mapSlotsBackFill);
+    // Determine the max permissible map & reduce task load
+    int maxMapLoad = (int) (overloadMapTaskMapSlotRatio * mapCapacity);
+    int maxReduceLoad = 
+      (int) (overloadReduceTaskReduceSlotRatio * reduceCapacity);
+    
+    // compute the total number of map & reduce tasks submitted
+    int totalMapTasks = ClusterStats.getSubmittedMapTasks();
+    int totalReduceTasks = ClusterStats.getSubmittedReduceTasks();
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Total submitted map tasks: " + totalMapTasks);
+      LOG.debug("Total submitted reduce tasks: " + totalReduceTasks);
+      LOG.debug("Max map load: " + maxMapLoad);
+      LOG.debug("Max reduce load: " + maxReduceLoad);
+    }
+    
+    // generate a pessimistic bound on the max running+pending map tasks
+    // this check is to avoid the heavy-duty actual map load calculation
+    int mapSlotsBackFill = (int) (maxMapLoad - totalMapTasks);
+    
+    // generate a pessimistic bound on the max running+pending reduce tasks
+    // this check is to avoid the heavy-duty actual reduce load calculation
+    int reduceSlotsBackFill = (int) (maxReduceLoad - totalReduceTasks);
+    
+    // maintain a list of seen job ids
+    Set<JobID> seenJobIDs = new HashSet<JobID>();
+    
+    // check if the total number of submitted map/reduce tasks exceeds the 
+    // permissible limit
+    if (totalMapTasks > maxMapLoad || totalReduceTasks > maxReduceLoad) {
+      // if yes, calculate the real load
+      float incompleteMapTasks = 0; // include pending & running map tasks.
+      float incompleteReduceTasks = 0; // include pending & running reduce tasks
+      
+      for (JobStats job : ClusterStats.getRunningJobStats()) {
+        JobID id = job.getJob().getJobID();
+        seenJobIDs.add(id);
+        
+        // Note that this is a hack! Ideally, ClusterStats.getRunningJobStats()
+        // should be smart enough to take care of completed jobs.
+        if (blacklistedJobs.contains(id)) {
+          LOG.warn("Ignoring blacklisted job: " + id);
+          continue;
+        }
+        
+        int noOfMaps = job.getNoOfMaps();
+        int noOfReduces = job.getNoOfReds();
+        
+        // consider polling for jobs where maps>0 and reds>0
+        // TODO: What about setup/cleanup tasks for cases where m=0 and r=0
+        //       What otherwise?
+        if (noOfMaps > 0 || noOfReduces > 0) {
+          // get the job's status
+          JobStatus status = job.getJobStatus();
+          
+          // blacklist completed jobs and continue
+          if (status != null && status.isJobComplete()) {
+            LOG.warn("Blacklisting completed job: " + id);
+            blacklistedJobs.add(id);
+            continue;
+          }
+          
+          // get the map and reduce tasks' progress
+          float mapProgress = 0f;
+          float reduceProgress = 0f;
+          
+          // check if the status is missing (this can happen for unpolled jobs)
+          if (status != null) {
+            mapProgress = status.getMapProgress();
+            reduceProgress = status.getReduceProgress();
+          }
+          
+          incompleteMapTasks += 
+            calcEffectiveIncompleteMapTasks(mapCapacity, noOfMaps, mapProgress);
+
+          // bail out early
+          int currentMapSlotsBackFill = (int) (maxMapLoad - incompleteMapTasks);
+          if (currentMapSlotsBackFill <= 0) {
+            // reset the reduce task load since we are bailing out
+            incompleteReduceTasks = totalReduceTasks;
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Terminating overload check due to high map load.");
+            }
+            break;
+          }
+
+          // compute the real reduce load
+          if (noOfReduces > 0) {
+            incompleteReduceTasks += 
+              calcEffectiveIncompleteReduceTasks(reduceCapacity, noOfReduces, 
+                  reduceProgress);
+          }
+
+          // bail out early
+          int currentReduceSlotsBackFill = 
+            (int) (maxReduceLoad - incompleteReduceTasks);
+          if (currentReduceSlotsBackFill <= 0) {
+            // reset the map task load since we are bailing out
+            incompleteMapTasks = totalMapTasks;
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Terminating overload check due to high reduce load.");
+            }
+            break;
+          }
+        } else {
+          LOG.warn("Blacklisting empty job: " + id);
+          blacklistedJobs.add(id);
+        }
+      }
+
+      // calculate the real map load on the cluster
+      mapSlotsBackFill = (int) (maxMapLoad - incompleteMapTasks);
+      
+      // calculate the real reduce load on the cluster
+      reduceSlotsBackFill = (int)(maxReduceLoad - incompleteReduceTasks);
+      
+      // clean up the backlisted set to keep the memory footprint minimal
+      // retain only the jobs that are seen in this cycle
+      blacklistedJobs.retainAll(seenJobIDs);
+      if (LOG.isDebugEnabled() && blacklistedJobs.size() > 0) {
+        LOG.debug("Blacklisted jobs count: " + blacklistedJobs.size());
+      }
+    }
+    
+    // update
+    loadStatus.updateMapLoad(mapSlotsBackFill); 
+    loadStatus.updateReduceLoad(reduceSlotsBackFill);
     
     if (loadStatus.getMapLoad() <= 0) {
       if (LOG.isDebugEnabled()) {
@@ -303,23 +459,7 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
       }
       return; // stop calculation because we know it is overloaded.
     }
-
-    float incompleteReduceTasks = 0; // include pending & running reduce tasks.
-    for (JobStats job : ClusterStats.getRunningJobStats()) {
-      // Cached the num-reds value in JobStats
-      int noOfReduces = job.getNoOfReds();
-      if (noOfReduces > 0) {
-        float reduceProgress = job.getJob().reduceProgress();
-        incompleteReduceTasks += 
-          calcEffectiveIncompleteReduceTasks(reduceCapacity, noOfReduces, 
-                                             reduceProgress);
-      }
-    }
     
-    int reduceSlotsBackFill = 
-      (int)((overloadReduceTaskReduceSlotRatio * reduceCapacity) 
-             - incompleteReduceTasks);
-    loadStatus.updateReduceLoad(reduceSlotsBackFill);
     if (loadStatus.getReduceLoad() <= 0) {
       if (LOG.isDebugEnabled()) {
         LOG.debug(System.currentTimeMillis() + " [REDUCE-LOAD] Overloaded is "
@@ -445,7 +585,7 @@ public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
                      || (numJobsBackfill <= 0));
     }
     
-    public synchronized boolean overloaded() {
+    public boolean overloaded() {
       return overloaded.get();
     }
     

+ 217 - 0
hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixStatistics.java

@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.gridmix;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskInfo;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * Test the Gridmix's {@link Statistics} class.
+ */
+public class TestGridmixStatistics {
+  /**
+   * Test {@link Statistics.JobStats}.
+   */
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testJobStats() throws Exception {
+    Job job = new Job() {};
+    JobStats stats = new JobStats(1, 2, job);
+    assertEquals("Incorrect num-maps", 1, stats.getNoOfMaps());
+    assertEquals("Incorrect num-reds", 2, stats.getNoOfReds());
+    assertTrue("Incorrect job", job == stats.getJob());
+    assertNull("Unexpected job status", stats.getJobStatus());
+    
+    // add a new status
+    JobStatus status = new JobStatus();
+    stats.updateJobStatus(status);
+    assertNotNull("Missing job status", stats.getJobStatus());
+    assertTrue("Incorrect job status", status == stats.getJobStatus());
+  }
+  
+  private static JobStory getCustomJobStory(final int numMaps, 
+                                            final int numReds) {
+    return new JobStory() {
+      @Override
+      public InputSplit[] getInputSplits() {
+        return null;
+      }
+      @Override
+      public JobConf getJobConf() {
+        return null;
+      }
+      @Override
+      public JobID getJobID() {
+        return null;
+      }
+      @Override
+      public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int arg0, int arg1,
+                                                           int arg2) {
+        return null;
+      }
+      @Override
+      public String getName() {
+        return null;
+      }
+      @Override
+      public int getNumberMaps() {
+        return numMaps;
+      }
+      @Override
+      public int getNumberReduces() {
+        return numReds;
+      }
+      @Override
+      public Values getOutcome() {
+        return null;
+      }
+      @Override
+      public String getQueueName() {
+        return null;
+      }
+      @Override
+      public long getSubmissionTime() {
+        return 0;
+      }
+      @Override
+      public TaskAttemptInfo getTaskAttemptInfo(TaskType arg0, int arg1, 
+                                                int arg2) {
+        return null;
+      }
+      @Override
+      public TaskInfo getTaskInfo(TaskType arg0, int arg1) {
+        return null;
+      }
+      @Override
+      public String getUser() {
+        return null;
+      }
+    };
+  }
+  
+  /**
+   * Test {@link Statistics}.
+   */
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testStatistics() throws Exception {
+    // test job stats generation
+    Configuration conf = new Configuration();
+    
+    // test dummy jobs like data-generation etc
+    Job job = new Job(conf) {
+    };
+    JobStats stats = Statistics.generateJobStats(job, null);
+    testJobStats(stats, -1, -1, null, job);
+    
+    // add a job desc with 2 map and 1 reduce task
+    conf.setInt(GridmixJob.GRIDMIX_JOB_SEQ, 1);
+    
+    // test dummy jobs like data-generation etc
+    job = new Job(conf) {
+    };
+    JobStory zjob = getCustomJobStory(2, 1);
+    stats = Statistics.generateJobStats(job, zjob);
+    testJobStats(stats, 2, 1, null, job);
+    
+    // add a job status
+    JobStatus jStatus = new JobStatus();
+    stats.updateJobStatus(jStatus);
+    testJobStats(stats, 2, 1, jStatus, job);
+    
+    
+    // start the statistics
+    CountDownLatch startFlag = new CountDownLatch(1); // prevents the collector
+                                                      // thread from starting
+    Statistics statistics = new Statistics(new JobConf(), 0, startFlag);
+    statistics.start();
+
+    testClusterStats(0, 0, 0);
+    
+    // add to the statistics object
+    statistics.addJobStats(stats);
+    testClusterStats(2, 1, 1);
+    
+    // add another job
+    JobStory zjob2 = getCustomJobStory(10, 5);
+    conf.setInt(GridmixJob.GRIDMIX_JOB_SEQ, 2);
+    job = new Job(conf) {
+    };
+    
+    JobStats stats2 = Statistics.generateJobStats(job, zjob2);
+    statistics.addJobStats(stats2);
+    testClusterStats(12, 6, 2);
+    
+    // finish off one job
+    statistics.add(stats2);
+    testClusterStats(2, 1, 1);
+    
+    // finish off the other job
+    statistics.add(stats);
+    testClusterStats(0, 0, 0);
+    
+    statistics.shutdown();
+  }
+  
+  // test the job stats
+  private static void testJobStats(JobStats stats, int numMaps, int numReds,
+                                   JobStatus jStatus, Job job) {
+    assertEquals("Incorrect num map tasks", numMaps, stats.getNoOfMaps());
+    assertEquals("Incorrect num reduce tasks", numReds, stats.getNoOfReds());
+    
+    if (job != null) {
+      assertNotNull("Missing job", job);
+    }
+    // check running job
+    assertTrue("Incorrect job", job == stats.getJob());
+    
+    if (jStatus != null) {
+      assertNotNull("Missing job status", jStatus);
+    }
+    // check job stats
+    assertTrue("Incorrect job status", jStatus == stats.getJobStatus());
+  }
+  
+  // test the cluster stats
+  private static void testClusterStats(int numSubmittedMapTasks, 
+                                       int numSubmittedReduceTasks, 
+                                       int numSubmittedJobs) {
+    assertEquals("Incorrect count of total number of submitted map tasks", 
+                 numSubmittedMapTasks, ClusterStats.getSubmittedMapTasks());
+    assertEquals("Incorrect count of total number of submitted reduce tasks", 
+                 numSubmittedReduceTasks, 
+                 ClusterStats.getSubmittedReduceTasks());
+    assertEquals("Incorrect submitted jobs", 
+                 numSubmittedJobs, ClusterStats.getRunningJobStats().size());
+  }
+}

+ 3 - 2
hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java

@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.GZIPInputStream;
 
 import org.apache.hadoop.conf.Configuration;
@@ -96,7 +97,7 @@ public class TestGridmixSubmission {
     private final BlockingQueue<Job> retiredJobs;
 
     public TestMonitor(int expected, Statistics stats) {
-      super(stats);
+      super(5, TimeUnit.SECONDS, stats, 1);
       this.expected = expected;
       retiredJobs = new LinkedBlockingQueue<Job>();
     }
@@ -349,7 +350,7 @@ public class TestGridmixSubmission {
     }
 
     @Override
-    protected JobMonitor createJobMonitor(Statistics stats) {
+    protected JobMonitor createJobMonitor(Statistics stats, Configuration conf){
       monitor = new TestMonitor(NJOBS + 1, stats);
       return monitor;
     }

+ 22 - 7
hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSummary.java

@@ -193,7 +193,7 @@ public class TestGridmixSummary {
     es.update(null);
     assertEquals("ExecutionSummarizer init failed", 0, 
                  es.getSimulationStartTime());
-    testExecutionSummarizer(0, 0, 0, 0, 0, 0, es);
+    testExecutionSummarizer(0, 0, 0, 0, 0, 0, 0, es);
     
     long simStartTime = System.currentTimeMillis();
     es.start(null);
@@ -203,14 +203,24 @@ public class TestGridmixSummary {
                es.getSimulationStartTime() <= System.currentTimeMillis());
     
     // test with job stats
-    JobStats stats = generateFakeJobStats(1, 10, true);
+    JobStats stats = generateFakeJobStats(1, 10, true, false);
     es.update(stats);
-    testExecutionSummarizer(1, 10, 0, 1, 1, 0, es);
+    testExecutionSummarizer(1, 10, 0, 1, 1, 0, 0, es);
     
     // test with failed job 
-    stats = generateFakeJobStats(5, 1, false);
+    stats = generateFakeJobStats(5, 1, false, false);
     es.update(stats);
-    testExecutionSummarizer(6, 11, 0, 2, 1, 1, es);
+    testExecutionSummarizer(6, 11, 0, 2, 1, 1, 0, es);
+    
+    // test with successful but lost job 
+    stats = generateFakeJobStats(1, 1, true, true);
+    es.update(stats);
+    testExecutionSummarizer(7, 12, 0, 3, 1, 1, 1, es);
+    
+    // test with failed but lost job 
+    stats = generateFakeJobStats(2, 2, false, true);
+    es.update(stats);
+    testExecutionSummarizer(9, 14, 0, 4, 1, 1, 2, es);
     
     // test finalize
     //  define a fake job factory
@@ -306,7 +316,7 @@ public class TestGridmixSummary {
   // test the ExecutionSummarizer
   private static void testExecutionSummarizer(int numMaps, int numReds,
       int totalJobsInTrace, int totalJobSubmitted, int numSuccessfulJob, 
-      int numFailedJobs, ExecutionSummarizer es) {
+      int numFailedJobs, int numLostJobs, ExecutionSummarizer es) {
     assertEquals("ExecutionSummarizer test failed [num-maps]", 
                  numMaps, es.getNumMapTasksLaunched());
     assertEquals("ExecutionSummarizer test failed [num-reducers]", 
@@ -319,12 +329,14 @@ public class TestGridmixSummary {
                  numSuccessfulJob, es.getNumSuccessfulJobs());
     assertEquals("ExecutionSummarizer test failed [num-failed jobs]", 
                  numFailedJobs, es.getNumFailedJobs());
+    assertEquals("ExecutionSummarizer test failed [num-lost jobs]", 
+                 numLostJobs, es.getNumLostJobs());
   }
   
   // generate fake job stats
   @SuppressWarnings("deprecation")
   private static JobStats generateFakeJobStats(final int numMaps, 
-      final int numReds, final boolean isSuccessful) 
+      final int numReds, final boolean isSuccessful, final boolean lost) 
   throws IOException {
     // A fake job 
     Job fakeJob = new Job() {
@@ -335,6 +347,9 @@ public class TestGridmixSummary {
       
       @Override
       public boolean isSuccessful() throws IOException, InterruptedException {
+        if (lost) {
+          throw new IOException("Test failure!");
+        }
         return isSuccessful;
       };
     };

+ 3 - 2
hadoop-mapreduce-project/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestSleepJob.java

@@ -42,6 +42,7 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.*;
 
@@ -74,7 +75,7 @@ public class TestSleepJob {
     private final int expected;
 
     public TestMonitor(int expected, Statistics stats) {
-      super(stats);
+      super(5, TimeUnit.SECONDS, stats, 1);
       this.expected = expected;
       retiredJobs = new LinkedBlockingQueue<Job>();
     }
@@ -102,7 +103,7 @@ public class TestSleepJob {
     private TestMonitor monitor;
 
     @Override
-    protected JobMonitor createJobMonitor(Statistics stats) {
+    protected JobMonitor createJobMonitor(Statistics stats, Configuration c) {
       monitor = new TestMonitor(NJOBS + 1, stats);
       return monitor;
     }

+ 16 - 0
hadoop-mapreduce-project/src/docs/src/documentation/content/xdocs/gridmix.xml

@@ -206,6 +206,22 @@ hadoop jar &lt;gridmix-jar&gt; org.apache.hadoop.mapred.gridmix.Gridmix \
               options using the values obtained from the original task (i.e via
               trace).
           </td>
+          </tr>
+          <tr>
+          <td>
+            <code>gridmix.job-monitor.thread-count</code>
+          </td>
+          <td>Total number of threads to use for polling for jobs' status. The
+              default value is 1.
+          </td>
+        </tr>
+        <tr>
+          <td>
+            <code>gridmix.job-monitor.sleep-time-ms</code>
+          </td>
+          <td>The time each Gridmix status poller thread will sleep before
+              starting the next cycle. The default value is 500 milliseconds.
+          </td>
         </tr>
       </table>
     </section>