Преглед изворни кода

MAPREDUCE-3169. Create a new MiniMRCluster equivalent which only provides client APIs cross MR1 and MR2. (Ahmed via tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-205@1203419 13f79535-47bb-0310-9956-ffa450edef68
Alejandro Abdelnur пре 13 година
родитељ
комит
a02bac4b64

+ 3 - 0
CHANGES.txt

@@ -39,6 +39,9 @@ Release 0.20.205.1 - unreleased
     HADOOP-7816. Allow HADOOP_HOME deprecated warning suppression based 
     on config specified in hadoop-env.sh (Dave Thompson via suresh)
 
+    MAPREDUCE-3169. Create a new MiniMRCluster equivalent which only provides
+    client APIs cross MR1 and MR2. (Ahmed via tucu)
+
   BUG FIXES
 
     HADOOP-7815. Fixed configuring map memory mb in hadoop-setup-conf.sh. 

+ 38 - 0
src/test/org/apache/hadoop/mapred/MiniMRClientCluster.java

@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/*
+ * A simple interface for a client MR cluster used for testing. This interface
+ * provides basic methods which are independent of the underlying Mini Cluster (
+ * either through MR1 or MR2).
+ */
+public interface MiniMRClientCluster {
+
+  public void start() throws IOException;
+
+  public void stop() throws IOException;
+
+  public Configuration getConfig() throws IOException;
+
+}

+ 42 - 0
src/test/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * A MiniMRCluster factory. In MR2, it provides a wrapper MiniMRClientCluster
+ * interface around the MiniMRYarnCluster. While in MR1, it provides such
+ * wrapper around MiniMRCluster. This factory should be used in tests to provide
+ * an easy migration of tests across MR1 and MR2.
+ */
+public class MiniMRClientClusterFactory {
+
+  public static MiniMRClientCluster create(Class<?> caller, int noOfNMs,
+      Configuration conf) throws IOException {
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    MiniMRCluster miniMRCluster = new MiniMRCluster(noOfNMs, fs.getUri()
+        .toString(), 1);
+    return new MiniMRClusterAdapter(miniMRCluster);
+  }
+
+}

+ 53 - 0
src/test/org/apache/hadoop/mapred/MiniMRClusterAdapter.java

@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * An adapter for MiniMRCluster providing a MiniMRClientCluster interface. This
+ * interface could be used by tests across both MR1 and MR2.
+ */
+public class MiniMRClusterAdapter implements MiniMRClientCluster {
+
+  private MiniMRCluster miniMRCluster;
+
+  public MiniMRClusterAdapter(MiniMRCluster miniMRCluster) {
+    this.miniMRCluster = miniMRCluster;
+  }
+
+  @Override
+  public Configuration getConfig() throws IOException {
+    return miniMRCluster.createJobConf();
+  }
+
+  @Override
+  public void start() throws IOException {
+    miniMRCluster.startJobTracker();
+    miniMRCluster.startTaskTracker(null, null, 0, 1);
+  }
+
+  @Override
+  public void stop() throws IOException {
+    miniMRCluster.shutdown();
+  }
+
+}

+ 170 - 0
src/test/org/apache/hadoop/mapred/TestMiniMRClientCluster.java

@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Basic testing for the MiniMRClientCluster. This test shows an example class
+ * that can be used in MR1 or MR2, without any change to the test. The test will
+ * use MiniMRYarnCluster in MR2, and MiniMRCluster in MR1.
+ */
+public class TestMiniMRClientCluster {
+
+  private static Path inDir = null;
+  private static Path outDir = null;
+  private static Path testdir = null;
+  private static Path[] inFiles = new Path[5];
+  private static MiniMRClientCluster mrCluster;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    final Configuration conf = new Configuration();
+    final Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
+        "/tmp"));
+    testdir = new Path(TEST_ROOT_DIR, "TestMiniMRClientCluster");
+    inDir = new Path(testdir, "in");
+    outDir = new Path(testdir, "out");
+
+    FileSystem fs = FileSystem.getLocal(conf);
+    if (fs.exists(testdir) && !fs.delete(testdir, true)) {
+      throw new IOException("Could not delete " + testdir);
+    }
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir);
+    }
+
+    for (int i = 0; i < inFiles.length; i++) {
+      inFiles[i] = new Path(inDir, "part_" + i);
+      createFile(inFiles[i], conf);
+    }
+
+    // create the mini cluster to be used for the tests
+    mrCluster = MiniMRClientClusterFactory.create(
+        TestMiniMRClientCluster.class, 1, new Configuration());
+  }
+
+  @AfterClass
+  public static void cleanup() throws IOException {
+    // clean up the input and output files
+    final Configuration conf = new Configuration();
+    final FileSystem fs = testdir.getFileSystem(conf);
+    if (fs.exists(testdir)) {
+      fs.delete(testdir, true);
+    }
+    // stopping the mini cluster
+    mrCluster.stop();
+  }
+
+  @Test
+  public void testJob() throws Exception {
+    final Job job = createJob();
+    org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job,
+        inDir);
+    org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job,
+        new Path(outDir, "testJob"));
+    assertTrue(job.waitForCompletion(true));
+    validateCounters(job.getCounters(), 5, 25, 5, 5);
+  }
+
+  private void validateCounters(Counters counters, long mapInputRecords,
+      long mapOutputRecords, long reduceInputGroups, long reduceOutputRecords) {
+    assertEquals("MapInputRecords", mapInputRecords, counters.findCounter(
+        "MyCounterGroup", "MAP_INPUT_RECORDS").getValue());
+    assertEquals("MapOutputRecords", mapOutputRecords, counters.findCounter(
+        "MyCounterGroup", "MAP_OUTPUT_RECORDS").getValue());
+    assertEquals("ReduceInputGroups", reduceInputGroups, counters.findCounter(
+        "MyCounterGroup", "REDUCE_INPUT_GROUPS").getValue());
+    assertEquals("ReduceOutputRecords", reduceOutputRecords, counters
+        .findCounter("MyCounterGroup", "REDUCE_OUTPUT_RECORDS").getValue());
+  }
+
+  private static void createFile(Path inFile, Configuration conf)
+      throws IOException {
+    final FileSystem fs = inFile.getFileSystem(conf);
+    if (fs.exists(inFile)) {
+      return;
+    }
+    FSDataOutputStream out = fs.create(inFile);
+    out.writeBytes("This is a test file");
+    out.close();
+  }
+
+  public static Job createJob() throws IOException {
+    final Job baseJob = new Job(mrCluster.getConfig());
+    baseJob.setOutputKeyClass(Text.class);
+    baseJob.setOutputValueClass(IntWritable.class);
+    baseJob.setMapperClass(MyMapper.class);
+    baseJob.setReducerClass(MyReducer.class);
+    baseJob.setNumReduceTasks(1);
+    return baseJob;
+  }
+
+  public static class MyMapper extends
+      org.apache.hadoop.mapreduce.Mapper<Object, Text, Text, IntWritable> {
+    private final static IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+
+    public void map(Object key, Text value, Context context)
+        throws IOException, InterruptedException {
+      context.getCounter("MyCounterGroup", "MAP_INPUT_RECORDS").increment(1);
+      StringTokenizer iter = new StringTokenizer(value.toString());
+      while (iter.hasMoreTokens()) {
+        word.set(iter.nextToken());
+        context.write(word, one);
+        context.getCounter("MyCounterGroup", "MAP_OUTPUT_RECORDS").increment(1);
+      }
+    }
+  }
+
+  public static class MyReducer extends
+      org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable> {
+    private IntWritable result = new IntWritable();
+
+    public void reduce(Text key, Iterable<IntWritable> values, Context context)
+        throws IOException, InterruptedException {
+      context.getCounter("MyCounterGroup", "REDUCE_INPUT_GROUPS").increment(1);
+      int sum = 0;
+      for (IntWritable val : values) {
+        sum += val.get();
+      }
+      result.set(sum);
+      context.write(key, result);
+      context.getCounter("MyCounterGroup", "REDUCE_OUTPUT_RECORDS")
+          .increment(1);
+    }
+  }
+
+}