Bläddra i källkod

MAPREDUCE-4645. Provide a random seed to Slive to make the sequence of file names deterministic. Contributed by Ravi Prakash.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1389602 13f79535-47bb-0310-9956-ffa450edef68
Konstantin Shvachko 12 år sedan
förälder
incheckning
b08b6e21f8

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -8,6 +8,9 @@ Release 0.23.4 - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-4645. Provide a random seed to Slive to make the sequence
+    of file names deterministic. (Ravi Prakash via shv)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 3 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/Operation.java

@@ -41,7 +41,9 @@ abstract class Operation {
     this.config = cfg;
     this.type = type;
     this.rnd = rnd;
-    this.finder = new PathFinder(cfg, rnd);
+    // Use a new Random instance so that the sequence of file names produced is
+    // the same even in case of unsuccessful operations
+    this.finder = new PathFinder(cfg, new Random(rnd.nextInt()));
   }
 
   /**

+ 15 - 20
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/slive/SliveMapper.java

@@ -32,6 +32,8 @@ import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -50,8 +52,7 @@ public class SliveMapper extends MapReduceBase implements
 
   private FileSystem filesystem;
   private ConfigExtractor config;
-  private WeightSelector selector;
-  private Random rnd;
+  private int taskId;
 
   /*
    * (non-Javadoc)
@@ -70,19 +71,19 @@ public class SliveMapper extends MapReduceBase implements
     }
     try {
       config = new ConfigExtractor(conf);
-      Long rndSeed = config.getRandomSeed();
-      if (rndSeed != null) {
-        rnd = new Random(rndSeed);
-      } else {
-        rnd = new Random();
-      }
-      selector = new WeightSelector(config, rnd);
       ConfigExtractor.dumpOptions(config);
     } catch (Exception e) {
       LOG.error("Unable to setup slive " + StringUtils.stringifyException(e));
       throw new RuntimeException("Unable to setup slive configuration", e);
     }
-
+    if(conf.get(MRJobConfig.TASK_ATTEMPT_ID) != null ) {
+      this.taskId = TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID))
+        .getTaskID().getId();
+    } else {
+      // So that branch-1/0.20 can run this same code as well
+      this.taskId = TaskAttemptID.forName(conf.get("mapred.task.id"))
+          .getTaskID().getId();
+    }
   }
 
   /**
@@ -94,15 +95,6 @@ public class SliveMapper extends MapReduceBase implements
     return config;
   }
 
-  /**
-   * Gets the operation selector to use for this object
-   * 
-   * @return WeightSelector
-   */
-  private WeightSelector getSelector() {
-    return selector;
-  }
-
   /**
    * Logs to the given reporter and logs to the internal logger at info level
    * 
@@ -154,6 +146,10 @@ public class SliveMapper extends MapReduceBase implements
       Reporter reporter) throws IOException {
     logAndSetStatus(reporter, "Running slive mapper for dummy key " + key
         + " and dummy value " + value);
+    //Add taskID to randomSeed to deterministically seed rnd.
+    Random rnd = config.getRandomSeed() != null ?
+      new Random(this.taskId + config.getRandomSeed()) : new Random();
+    WeightSelector selector = new WeightSelector(config, rnd);
     long startTime = Timer.now();
     long opAm = 0;
     long sleepOps = 0;
@@ -163,7 +159,6 @@ public class SliveMapper extends MapReduceBase implements
     if (sleepRange != null) {
       sleeper = new SleepOp(getConfig(), rnd);
     }
-    WeightSelector selector = getSelector();
     while (Timer.elapsed(startTime) < duration) {
       try {
         logAndSetStatus(reporter, "Attempting to select operation #"