ソースを参照

MAPREDUCE-4905. test org.apache.hadoop.mapred.pipes (Aleksey Gorshkov via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1443027 13f79535-47bb-0310-9956-ffa450edef68
Robert Joseph Evans 12 年 前
コミット
79b12f6f4c

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -685,6 +685,9 @@ Release 0.23.7 - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-4905. test org.apache.hadoop.mapred.pipes 
+    (Aleksey Gorshkov via bobby)
+
   OPTIMIZATIONS
 
     MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java

@@ -58,6 +58,7 @@ import org.apache.hadoop.mapred.lib.LazyOutputFormat;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 
@@ -515,7 +516,7 @@ public class Submitter extends Configured implements Tool {
    */
   public static void main(String[] args) throws Exception {
     int exitCode =  new Submitter().run(args);
-    System.exit(exitCode);
+    ExitUtil.terminate(exitCode);
   }
 
 }

+ 153 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/CommonStub.java

@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.pipes;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+
+public class CommonStub {
+
+  protected Socket socket = null;
+  protected DataInputStream dataInput;
+  protected DataOutputStream dataOut;
+
+  protected String createDigest(byte[] password, String data) throws IOException {
+    SecretKey key = JobTokenSecretManager.createSecretKey(password);
+
+    return SecureShuffleUtils.hashFromString(data, key);
+
+  }
+
+  protected void readObject(Writable obj, DataInputStream inStream) throws IOException {
+    int numBytes = WritableUtils.readVInt(inStream);
+    byte[] buffer;
+    // For BytesWritable and Text, use the specified length to set the length
+    // this causes the "obvious" translations to work. So that if you emit
+    // a string "abc" from C++, it shows up as "abc".
+    if (obj instanceof BytesWritable) {
+      buffer = new byte[numBytes];
+      inStream.readFully(buffer);
+      ((BytesWritable) obj).set(buffer, 0, numBytes);
+    } else if (obj instanceof Text) {
+      buffer = new byte[numBytes];
+      inStream.readFully(buffer);
+      ((Text) obj).set(buffer);
+    } else {
+      obj.readFields(inStream);
+    }
+  }
+
+
+  protected void writeObject(Writable obj, DataOutputStream stream)
+          throws IOException {
+    // For Text and BytesWritable, encode them directly, so that they end up
+    // in C++ as the natural translations.
+    DataOutputBuffer buffer = new DataOutputBuffer();
+    if (obj instanceof Text) {
+      Text t = (Text) obj;
+      int len = t.getLength();
+      WritableUtils.writeVLong(stream, len);
+      stream.flush();
+
+      stream.write(t.getBytes(), 0, len);
+      stream.flush();
+
+    } else if (obj instanceof BytesWritable) {
+      BytesWritable b = (BytesWritable) obj;
+      int len = b.getLength();
+      WritableUtils.writeVLong(stream, len);
+      stream.write(b.getBytes(), 0, len);
+    } else {
+      buffer.reset();
+      obj.write(buffer);
+      int length = buffer.getLength();
+
+      WritableUtils.writeVInt(stream, length);
+      stream.write(buffer.getData(), 0, length);
+    }
+    stream.flush();
+
+  }
+
+  protected void initSoket() throws Exception {
+    int port = Integer.parseInt(System.getenv("mapreduce.pipes.command.port"));
+
+    java.net.InetAddress address = java.net.InetAddress.getLocalHost();
+
+    socket = new Socket(address.getHostName(), port);
+    InputStream input = socket.getInputStream();
+    OutputStream output = socket.getOutputStream();
+
+    // try to read
+    dataInput = new DataInputStream(input);
+
+    WritableUtils.readVInt(dataInput);
+
+    String str = Text.readString(dataInput);
+
+    Text.readString(dataInput);
+
+    dataOut = new DataOutputStream(output);
+    WritableUtils.writeVInt(dataOut, 57);
+    String s = createDigest("password".getBytes(), str);
+
+    Text.writeString(dataOut, s);
+
+    // start
+    WritableUtils.readVInt(dataInput);
+    int cuttentAnswer = WritableUtils.readVInt(dataInput);
+    System.out.println("CURRENT_PROTOCOL_VERSION:" + cuttentAnswer);
+
+    // get configuration
+    // should be MessageType.SET_JOB_CONF.code
+    WritableUtils.readVInt(dataInput);
+
+    // array length
+
+    int j = WritableUtils.readVInt(dataInput);
+    for (int i = 0; i < j; i++) {
+      Text.readString(dataInput);
+      i++;
+      Text.readString(dataInput);
+    }
+  }
+
+  protected void closeSoket() {
+    if (socket != null) {
+      try {
+        socket.close();
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+}

+ 87 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/PipeApplicationRunnableStub.java

@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.pipes;
+
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+/*
+ Stub for  TestPipeApplication   test. This stub produced test data for main test. Main test  checks data
+ */
+
+public class PipeApplicationRunnableStub extends CommonStub {
+
+  public static void main(String[] args) {
+    PipeApplicationRunnableStub client = new PipeApplicationRunnableStub();
+    client.binaryProtocolStub();
+  }
+
+  public void binaryProtocolStub() {
+    try {
+
+      initSoket();
+      System.out.println("start OK");
+
+      // RUN_MAP.code
+      // should be 3
+
+      int answer = WritableUtils.readVInt(dataInput);
+      System.out.println("RunMap:" + answer);
+      TestPipeApplication.FakeSplit split = new TestPipeApplication.FakeSplit();
+      readObject(split, dataInput);
+
+      WritableUtils.readVInt(dataInput);
+      WritableUtils.readVInt(dataInput);
+      // end runMap
+      // get InputTypes
+      WritableUtils.readVInt(dataInput);
+      String inText = Text.readString(dataInput);
+      System.out.println("Key class:" + inText);
+      inText = Text.readString(dataInput);
+      System.out.println("Value class:" + inText);
+
+      @SuppressWarnings("unused")
+      int inCode = 0;
+
+      // read all data from sender and write to output
+      while ((inCode = WritableUtils.readVInt(dataInput)) == 4) {
+        FloatWritable key = new FloatWritable();
+        NullWritable value = NullWritable.get();
+        readObject(key, dataInput);
+        System.out.println("value:" + key.get());
+        readObject(value, dataInput);
+      }
+
+      WritableUtils.writeVInt(dataOut, 54);
+
+      dataOut.flush();
+      dataOut.close();
+
+    } catch (Exception x) {
+      x.printStackTrace();
+    } finally {
+      closeSoket();
+    }
+
+  }
+
+}

+ 101 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/PipeApplicationStub.java

@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.pipes;
+
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+/*
+Stub for  TestPipeApplication   test. This stub produced test data for main test. Main test  checks data
+ */
+
+public class PipeApplicationStub extends CommonStub {
+
+  public static void main(String[] args) {
+    PipeApplicationStub client = new PipeApplicationStub();
+    client.binaryProtocolStub();
+  }
+
+  public void binaryProtocolStub() {
+    try {
+
+      initSoket();
+
+      // output code
+      WritableUtils.writeVInt(dataOut, 50);
+      IntWritable wt = new IntWritable();
+      wt.set(123);
+      writeObject(wt, dataOut);
+      writeObject(new Text("value"), dataOut);
+
+      //  PARTITIONED_OUTPUT
+      WritableUtils.writeVInt(dataOut, 51);
+      WritableUtils.writeVInt(dataOut, 0);
+      writeObject(wt, dataOut);
+      writeObject(new Text("value"), dataOut);
+
+
+      // STATUS
+      WritableUtils.writeVInt(dataOut, 52);
+      Text.writeString(dataOut, "PROGRESS");
+      dataOut.flush();
+
+      // progress
+      WritableUtils.writeVInt(dataOut, 53);
+      dataOut.writeFloat(0.55f);
+      // register counter
+      WritableUtils.writeVInt(dataOut, 55);
+      // id
+      WritableUtils.writeVInt(dataOut, 0);
+      Text.writeString(dataOut, "group");
+      Text.writeString(dataOut, "name");
+      // increment counter
+      WritableUtils.writeVInt(dataOut, 56);
+      WritableUtils.writeVInt(dataOut, 0);
+
+      WritableUtils.writeVLong(dataOut, 2);
+
+      // map item
+      int intValue = WritableUtils.readVInt(dataInput);
+      System.out.println("intValue:" + intValue);
+      IntWritable iw = new IntWritable();
+      readObject(iw, dataInput);
+      System.out.println("key:" + iw.get());
+      Text txt = new Text();
+      readObject(txt, dataInput);
+      System.out.println("value:" + txt.toString());
+
+      // done
+      // end of session
+      WritableUtils.writeVInt(dataOut, 54);
+
+      System.out.println("finish");
+      dataOut.flush();
+      dataOut.close();
+
+    } catch (Exception x) {
+      x.printStackTrace();
+    } finally {
+      closeSoket();
+    }
+  }
+
+
+}

+ 80 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/PipeReducerStub.java

@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.pipes;
+
+
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+
+/*
+Stub for  TestPipeApplication   test. This stub produced test data for main test. Main test  checks data
+ */
+
+public class PipeReducerStub extends CommonStub {
+
+  public static void main(String[] args) {
+    PipeReducerStub client = new PipeReducerStub();
+    client.binaryProtocolStub();
+  }
+
+  public void binaryProtocolStub() {
+    try {
+
+      initSoket();
+
+      //should be 5
+      //RUN_REDUCE boolean 
+      WritableUtils.readVInt(dataInput);
+      WritableUtils.readVInt(dataInput);
+      int intValue = WritableUtils.readVInt(dataInput);
+      System.out.println("getIsJavaRecordWriter:" + intValue);
+
+      // reduce key
+      WritableUtils.readVInt(dataInput);
+      // value of reduce key
+      BooleanWritable value = new BooleanWritable();
+      readObject(value, dataInput);
+      System.out.println("reducer key :" + value);
+      // reduce value code:
+
+      // reduce values
+      while ((intValue = WritableUtils.readVInt(dataInput)) == 7) {
+        Text txt = new Text();
+        // value
+        readObject(txt, dataInput);
+        System.out.println("reduce value  :" + txt);
+      }
+
+
+      // done
+      WritableUtils.writeVInt(dataOut, 54);
+
+      dataOut.flush();
+      dataOut.close();
+
+    } catch (Exception x) {
+      x.printStackTrace();
+    } finally {
+      closeSoket();
+
+    }
+  }
+
+}

+ 747 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java

@@ -0,0 +1,747 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.pipes;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.Counters.Group;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestPipeApplication {
+  private static File workSpace = new File("target",
+          TestPipeApplication.class.getName() + "-workSpace");
+
+  private static String taskName = "attempt_001_02_r03_04_05";
+
+  /**
+   * test PipesMapRunner    test the transfer data from reader
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testRunner() throws Exception {
+
+    // clean old password files
+    File[] psw = cleanTokenPasswordFile();
+    try {
+      RecordReader<FloatWritable, NullWritable> rReader = new ReaderPipesMapRunner();
+      JobConf conf = new JobConf();
+      conf.set(Submitter.IS_JAVA_RR, "true");
+      // for stdour and stderror
+
+      conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskName);
+
+      CombineOutputCollector<IntWritable, Text> output = new CombineOutputCollector<IntWritable, Text>(
+              new Counters.Counter(), new Progress());
+      FileSystem fs = new RawLocalFileSystem();
+      fs.setConf(conf);
+      Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs,
+              new Path(workSpace + File.separator + "outfile"), IntWritable.class,
+              Text.class, null, null);
+      output.setWriter(wr);
+      // stub for client
+      File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeApplicationRunnableStub");
+
+      conf.set(MRJobConfig.CACHE_LOCALFILES, fCommand.getAbsolutePath());
+      // token for authorization
+      Token<ApplicationTokenIdentifier> token = new Token<ApplicationTokenIdentifier>(
+              "user".getBytes(), "password".getBytes(), new Text("kind"), new Text(
+              "service"));
+      conf.getCredentials().addToken(new Text("ShuffleAndJobToken"), token);
+      conf.setBoolean(MRJobConfig.SKIP_RECORDS, true);
+      TestTaskReporter reporter = new TestTaskReporter();
+      PipesMapRunner<FloatWritable, NullWritable, IntWritable, Text> runner = new PipesMapRunner<FloatWritable, NullWritable, IntWritable, Text>();
+
+      initStdOut(conf);
+
+      runner.configure(conf);
+      runner.run(rReader, output, reporter);
+
+      String stdOut = readStdOut(conf);
+
+      // test part of translated data. As common file for client and test -
+      // clients stdOut
+      // check version
+      assertTrue(stdOut.contains("CURRENT_PROTOCOL_VERSION:0"));
+      // check key and value classes
+      assertTrue(stdOut
+              .contains("Key class:org.apache.hadoop.io.FloatWritable"));
+      assertTrue(stdOut
+              .contains("Value class:org.apache.hadoop.io.NullWritable"));
+      // test have sent all data from reader
+      assertTrue(stdOut.contains("value:0.0"));
+      assertTrue(stdOut.contains("value:9.0"));
+
+    } finally {
+      if (psw != null) {
+        // remove password files
+        for (File file : psw) {
+          file.deleteOnExit();
+        }
+      }
+
+    }
+  }
+
+  /**
+   * test org.apache.hadoop.mapred.pipes.Application
+   * test a internal functions: MessageType.REGISTER_COUNTER,  INCREMENT_COUNTER, STATUS, PROGRESS...
+   *
+   * @throws Throwable
+   */
+
+  @Test
+  public void testApplication() throws Throwable {
+    JobConf conf = new JobConf();
+
+    RecordReader<FloatWritable, NullWritable> rReader = new Reader();
+
+    // client for test
+    File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeApplicationStub");
+
+    TestTaskReporter reporter = new TestTaskReporter();
+
+    File[] psw = cleanTokenPasswordFile();
+    try {
+
+      conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskName);
+      conf.set(MRJobConfig.CACHE_LOCALFILES, fCommand.getAbsolutePath());
+
+      // token for authorization
+      Token<ApplicationTokenIdentifier> token = new Token<ApplicationTokenIdentifier>(
+              "user".getBytes(), "password".getBytes(), new Text("kind"), new Text(
+              "service"));
+
+      conf.getCredentials().addToken(new Text("ShuffleAndJobToken"), token);
+      FakeCollector output = new FakeCollector(new Counters.Counter(),
+              new Progress());
+      FileSystem fs = new RawLocalFileSystem();
+      fs.setConf(conf);
+      Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs,
+              new Path(workSpace.getAbsolutePath() + File.separator + "outfile"),
+              IntWritable.class, Text.class, null, null);
+      output.setWriter(wr);
+      conf.set(Submitter.PRESERVE_COMMANDFILE, "true");
+
+      Application<WritableComparable<IntWritable>, Writable, IntWritable, Text> application = new Application<WritableComparable<IntWritable>, Writable, IntWritable, Text>(
+              conf, rReader, output, reporter, IntWritable.class, Text.class);
+      application.getDownlink().flush();
+
+      application.getDownlink().mapItem(new IntWritable(3), new Text("txt"));
+
+      application.getDownlink().flush();
+
+      application.waitForFinish();
+
+      wr.close();
+
+      // test getDownlink().mapItem();
+      String stdOut = readStdOut(conf);
+      assertTrue(stdOut.contains("key:3"));
+      assertTrue(stdOut.contains("value:txt"));
+
+      // reporter test counter, and status should be sended
+      // test MessageType.REGISTER_COUNTER and INCREMENT_COUNTER
+      assertEquals(1.0, reporter.getProgress(), 0.01);
+      assertNotNull(reporter.getCounter("group", "name"));
+      // test status MessageType.STATUS
+      assertEquals(reporter.getStatus(), "PROGRESS");
+      stdOut = readFile(new File(workSpace.getAbsolutePath() + File.separator
+              + "outfile"));
+      // check MessageType.PROGRESS
+      assertEquals(0.55f, rReader.getProgress(), 0.001);
+      application.getDownlink().close();
+      // test MessageType.OUTPUT
+      Entry<IntWritable, Text> entry = output.getCollect().entrySet()
+              .iterator().next();
+      assertEquals(123, entry.getKey().get());
+      assertEquals("value", entry.getValue().toString());
+      try {
+        // try to abort
+        application.abort(new Throwable());
+        fail();
+      } catch (IOException e) {
+        // abort works ?
+        assertEquals("pipe child exception", e.getMessage());
+      }
+    } finally {
+      if (psw != null) {
+        // remove password files
+        for (File file : psw) {
+          file.deleteOnExit();
+        }
+      }
+    }
+  }
+
+  /**
+   * test org.apache.hadoop.mapred.pipes.Submitter
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testSubmitter() throws Exception {
+
+    JobConf conf = new JobConf();
+
+    File[] psw = cleanTokenPasswordFile();
+
+    System.setProperty("test.build.data",
+            "target/tmp/build/TEST_SUBMITTER_MAPPER/data");
+    conf.set("hadoop.log.dir", "target/tmp");
+
+    // prepare configuration
+    Submitter.setIsJavaMapper(conf, false);
+    Submitter.setIsJavaReducer(conf, false);
+    Submitter.setKeepCommandFile(conf, false);
+    Submitter.setIsJavaRecordReader(conf, false);
+    Submitter.setIsJavaRecordWriter(conf, false);
+    PipesPartitioner<IntWritable, Text> partitioner = new PipesPartitioner<IntWritable, Text>();
+    partitioner.configure(conf);
+
+    Submitter.setJavaPartitioner(conf, partitioner.getClass());
+
+    assertEquals(PipesPartitioner.class, (Submitter.getJavaPartitioner(conf)));
+    // test going to call main method with System.exit(). Change Security
+    SecurityManager securityManager = System.getSecurityManager();
+    // store System.out
+    PrintStream oldps = System.out;
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    ExitUtil.disableSystemExit();
+    // test without parameters
+    try {
+      System.setOut(new PrintStream(out));
+      Submitter.main(new String[0]);
+      fail();
+    } catch (ExitUtil.ExitException e) {
+      // System.exit prohibited! output message test
+      assertTrue(out.toString().contains(""));
+      assertTrue(out.toString().contains("bin/hadoop pipes"));
+      assertTrue(out.toString().contains("[-input <path>] // Input directory"));
+      assertTrue(out.toString()
+              .contains("[-output <path>] // Output directory"));
+      assertTrue(out.toString().contains("[-jar <jar file> // jar filename"));
+      assertTrue(out.toString().contains(
+              "[-inputformat <class>] // InputFormat class"));
+      assertTrue(out.toString().contains("[-map <class>] // Java Map class"));
+      assertTrue(out.toString().contains(
+              "[-partitioner <class>] // Java Partitioner"));
+      assertTrue(out.toString().contains(
+              "[-reduce <class>] // Java Reduce class"));
+      assertTrue(out.toString().contains(
+              "[-writer <class>] // Java RecordWriter"));
+      assertTrue(out.toString().contains(
+              "[-program <executable>] // executable URI"));
+      assertTrue(out.toString().contains(
+              "[-reduces <num>] // number of reduces"));
+      assertTrue(out.toString().contains(
+              "[-lazyOutput <true/false>] // createOutputLazily"));
+
+      assertTrue(out
+              .toString()
+              .contains(
+                      "-conf <configuration file>     specify an application configuration file"));
+      assertTrue(out.toString().contains(
+              "-D <property=value>            use value for given property"));
+      assertTrue(out.toString().contains(
+              "-fs <local|namenode:port>      specify a namenode"));
+      assertTrue(out.toString().contains(
+              "-jt <local|jobtracker:port>    specify a job tracker"));
+      assertTrue(out
+              .toString()
+              .contains(
+                      "-files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster"));
+      assertTrue(out
+              .toString()
+              .contains(
+                      "-libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath."));
+      assertTrue(out
+              .toString()
+              .contains(
+                      "-archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines."));
+    } finally {
+      System.setOut(oldps);
+      // restore
+      System.setSecurityManager(securityManager);
+      if (psw != null) {
+        // remove password files
+        for (File file : psw) {
+          file.deleteOnExit();
+        }
+      }
+    }
+    // test call Submitter form command line
+    try {
+      File fCommand = getFileCommand(null);
+      String[] args = new String[22];
+      File input = new File(workSpace + File.separator + "input");
+      if (!input.exists()) {
+        Assert.assertTrue(input.createNewFile());
+      }
+      File outPut = new File(workSpace + File.separator + "output");
+      FileUtil.fullyDelete(outPut);
+
+      args[0] = "-input";
+      args[1] = input.getAbsolutePath();// "input";
+      args[2] = "-output";
+      args[3] = outPut.getAbsolutePath();// "output";
+      args[4] = "-inputformat";
+      args[5] = "org.apache.hadoop.mapred.TextInputFormat";
+      args[6] = "-map";
+      args[7] = "org.apache.hadoop.mapred.lib.IdentityMapper";
+      args[8] = "-partitioner";
+      args[9] = "org.apache.hadoop.mapred.pipes.PipesPartitioner";
+      args[10] = "-reduce";
+      args[11] = "org.apache.hadoop.mapred.lib.IdentityReducer";
+      args[12] = "-writer";
+      args[13] = "org.apache.hadoop.mapred.TextOutputFormat";
+      args[14] = "-program";
+      args[15] = fCommand.getAbsolutePath();// "program";
+      args[16] = "-reduces";
+      args[17] = "2";
+      args[18] = "-lazyOutput";
+      args[19] = "lazyOutput";
+      args[20] = "-jobconf";
+      args[21] = "mapreduce.pipes.isjavarecordwriter=false,mapreduce.pipes.isjavarecordreader=false";
+
+      Submitter.main(args);
+      fail();
+    } catch (ExitUtil.ExitException e) {
+      // status should be 0
+      assertEquals(e.status, 0);
+
+    } finally {
+      System.setOut(oldps);
+      System.setSecurityManager(securityManager);
+    }
+
+  }
+
+  /**
+   * test org.apache.hadoop.mapred.pipes.PipesReducer
+   * test the transfer of data: key and value
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testPipesReduser() throws Exception {
+
+    File[] psw = cleanTokenPasswordFile();
+    JobConf conf = new JobConf();
+    try {
+      Token<ApplicationTokenIdentifier> token = new Token<ApplicationTokenIdentifier>(
+              "user".getBytes(), "password".getBytes(), new Text("kind"), new Text(
+              "service"));
+      conf.getCredentials().addToken(new Text("ShuffleAndJobToken"), token);
+
+      File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeReducerStub");
+      conf.set(MRJobConfig.CACHE_LOCALFILES, fCommand.getAbsolutePath());
+
+      PipesReducer<BooleanWritable, Text, IntWritable, Text> reducer = new PipesReducer<BooleanWritable, Text, IntWritable, Text>();
+      reducer.configure(conf);
+      BooleanWritable bw = new BooleanWritable(true);
+
+      conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskName);
+      initStdOut(conf);
+      conf.setBoolean(MRJobConfig.SKIP_RECORDS, true);
+      CombineOutputCollector<IntWritable, Text> output = new CombineOutputCollector<IntWritable, Text>(
+              new Counters.Counter(), new Progress());
+      Reporter reporter = new TestTaskReporter();
+      List<Text> texts = new ArrayList<Text>();
+      texts.add(new Text("first"));
+      texts.add(new Text("second"));
+      texts.add(new Text("third"));
+
+      reducer.reduce(bw, texts.iterator(), output, reporter);
+      reducer.close();
+      String stdOut = readStdOut(conf);
+      // test data: key
+      assertTrue(stdOut.contains("reducer key :true"));
+      // and values
+      assertTrue(stdOut.contains("reduce value  :first"));
+      assertTrue(stdOut.contains("reduce value  :second"));
+      assertTrue(stdOut.contains("reduce value  :third"));
+
+    } finally {
+      if (psw != null) {
+        // remove password files
+        for (File file : psw) {
+          file.deleteOnExit();
+        }
+      }
+    }
+
+  }
+
+  /**
+   * test PipesPartitioner
+   * test set and get data from  PipesPartitioner
+   */
+  @Test
+  public void testPipesPartitioner() {
+
+    PipesPartitioner<IntWritable, Text> partitioner = new PipesPartitioner<IntWritable, Text>();
+    JobConf configuration = new JobConf();
+    Submitter.getJavaPartitioner(configuration);
+    partitioner.configure(new JobConf());
+    IntWritable iw = new IntWritable(4);
+    // the cache empty
+    assertEquals(0, partitioner.getPartition(iw, new Text("test"), 2));
+    // set data into cache
+    PipesPartitioner.setNextPartition(3);
+    // get data from cache
+    assertEquals(3, partitioner.getPartition(iw, new Text("test"), 2));
+  }
+
+  /**
+   * clean previous std error and outs
+   */
+
+  private void initStdOut(JobConf configuration) {
+    TaskAttemptID taskId = TaskAttemptID.forName(configuration
+            .get(MRJobConfig.TASK_ATTEMPT_ID));
+    File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT);
+    File stdErr = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDERR);
+    // prepare folder
+    if (!stdOut.getParentFile().exists()) {
+      stdOut.getParentFile().mkdirs();
+    } else { // clean logs
+      stdOut.deleteOnExit();
+      stdErr.deleteOnExit();
+    }
+  }
+
+  private String readStdOut(JobConf conf) throws Exception {
+    TaskAttemptID taskId = TaskAttemptID.forName(conf
+            .get(MRJobConfig.TASK_ATTEMPT_ID));
+    File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT);
+
+    return readFile(stdOut);
+
+  }
+
+  private String readFile(File file) throws Exception {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    InputStream is = new FileInputStream(file);
+    byte[] buffer = new byte[1024];
+    int counter = 0;
+    while ((counter = is.read(buffer)) >= 0) {
+      out.write(buffer, 0, counter);
+    }
+
+    is.close();
+
+    return out.toString();
+
+  }
+
+  private class Progress implements Progressable {
+
+    @Override
+    public void progress() {
+
+    }
+
+  }
+
+  private File[] cleanTokenPasswordFile() throws Exception {
+    File[] result = new File[2];
+    result[0] = new File("./jobTokenPassword");
+    if (result[0].exists()) {
+      FileUtil.chmod(result[0].getAbsolutePath(), "700");
+      assertTrue(result[0].delete());
+    }
+    result[1] = new File("./.jobTokenPassword.crc");
+    if (result[1].exists()) {
+      FileUtil.chmod(result[1].getAbsolutePath(), "700");
+      result[1].delete();
+    }
+    return result;
+  }
+
+  private File getFileCommand(String clazz) throws Exception {
+    String classpath = System.getProperty("java.class.path");
+    File fCommand = new File(workSpace + File.separator + "cache.sh");
+    fCommand.deleteOnExit();
+    if (!fCommand.getParentFile().exists()) {
+      fCommand.getParentFile().mkdirs();
+    }
+    fCommand.createNewFile();
+    OutputStream os = new FileOutputStream(fCommand);
+    os.write("#!/bin/sh \n".getBytes());
+    if (clazz == null) {
+      os.write(("ls ").getBytes());
+    } else {
+      os.write(("java -cp " + classpath + " " + clazz).getBytes());
+    }
+    os.flush();
+    os.close();
+    FileUtil.chmod(fCommand.getAbsolutePath(), "700");
+    return fCommand;
+  }
+
+  private class CombineOutputCollector<K, V extends Object> implements
+          OutputCollector<K, V> {
+    private Writer<K, V> writer;
+    private Counters.Counter outCounter;
+    private Progressable progressable;
+
+    public CombineOutputCollector(Counters.Counter outCounter,
+                                  Progressable progressable) {
+      this.outCounter = outCounter;
+      this.progressable = progressable;
+    }
+
+    public synchronized void setWriter(Writer<K, V> writer) {
+      this.writer = writer;
+    }
+
+    public synchronized void collect(K key, V value) throws IOException {
+      outCounter.increment(1);
+      writer.append(key, value);
+      progressable.progress();
+    }
+  }
+
+  public static class FakeSplit implements InputSplit {
+    public void write(DataOutput out) throws IOException {
+    }
+
+    public void readFields(DataInput in) throws IOException {
+    }
+
+    public long getLength() {
+      return 0L;
+    }
+
+    public String[] getLocations() {
+      return new String[0];
+    }
+  }
+
+  private class TestTaskReporter implements Reporter {
+    private int recordNum = 0; // number of records processed
+    private String status = null;
+    private Counters counters = new Counters();
+    private InputSplit split = new FakeSplit();
+
+    @Override
+    public void progress() {
+
+      recordNum++;
+    }
+
+    @Override
+    public void setStatus(String status) {
+      this.status = status;
+
+    }
+
+    public String getStatus() {
+      return this.status;
+
+    }
+
+    public Counters.Counter getCounter(String group, String name) {
+      Counters.Counter counter = null;
+      if (counters != null) {
+        counter = counters.findCounter(group, name);
+        if (counter == null) {
+          Group grp = counters.addGroup(group, group);
+          counter = grp.addCounter(name, name, 10);
+        }
+      }
+      return counter;
+    }
+
+    public Counters.Counter getCounter(Enum<?> name) {
+      return counters == null ? null : counters.findCounter(name);
+    }
+
+    public void incrCounter(Enum<?> key, long amount) {
+      if (counters != null) {
+        counters.incrCounter(key, amount);
+      }
+    }
+
+    public void incrCounter(String group, String counter, long amount) {
+
+      if (counters != null) {
+        counters.incrCounter(group, counter, amount);
+      }
+
+    }
+
+    @Override
+    public InputSplit getInputSplit() throws UnsupportedOperationException {
+      return split;
+    }
+
+    @Override
+    public float getProgress() {
+      return recordNum;
+    }
+
+  }
+
+  private class Reader implements RecordReader<FloatWritable, NullWritable> {
+    private int index = 0;
+    private FloatWritable progress;
+
+    @Override
+    public boolean next(FloatWritable key, NullWritable value)
+            throws IOException {
+      progress = key;
+      index++;
+      return index <= 10;
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return progress.get();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+
+      return index;
+    }
+
+    @Override
+    public NullWritable createValue() {
+
+      return NullWritable.get();
+    }
+
+    @Override
+    public FloatWritable createKey() {
+      FloatWritable result = new FloatWritable(index);
+      return result;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+  }
+
+
+  private class ReaderPipesMapRunner implements RecordReader<FloatWritable, NullWritable> {
+    private int index = 0;
+
+    @Override
+    public boolean next(FloatWritable key, NullWritable value)
+            throws IOException {
+      key.set(index++);
+      return index <= 10;
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return index;
+    }
+
+    @Override
+    public long getPos() throws IOException {
+
+      return index;
+    }
+
+    @Override
+    public NullWritable createValue() {
+
+      return NullWritable.get();
+    }
+
+    @Override
+    public FloatWritable createKey() {
+      FloatWritable result = new FloatWritable(index);
+      return result;
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+  }
+
+  private class FakeCollector extends
+          CombineOutputCollector<IntWritable, Text> {
+
+    final private Map<IntWritable, Text> collect = new HashMap<IntWritable, Text>();
+
+    public FakeCollector(Counter outCounter, Progressable progressable) {
+      super(outCounter, progressable);
+    }
+
+    @Override
+    public synchronized void collect(IntWritable key, Text value)
+            throws IOException {
+      collect.put(key, value);
+      super.collect(key, value);
+    }
+
+    public Map<IntWritable, Text> getCollect() {
+      return collect;
+    }
+  }
+}

+ 89 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipesNonJavaInputFormat.java

@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.pipes;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.pipes.TestPipeApplication.FakeSplit;
+import org.junit.Assert;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+public class TestPipesNonJavaInputFormat {
+  private static File workSpace = new File("target",
+      TestPipesNonJavaInputFormat.class.getName() + "-workSpace");
+
+  /**
+   *  test PipesNonJavaInputFormat
+    */
+
+  @Test
+  public void testFormat() throws IOException {
+
+    PipesNonJavaInputFormat inputFormat = new PipesNonJavaInputFormat();
+    JobConf conf = new JobConf();
+
+    Reporter reporter= mock(Reporter.class);
+    RecordReader<FloatWritable, NullWritable> reader = inputFormat
+        .getRecordReader(new FakeSplit(), conf, reporter);
+    assertEquals(0.0f, reader.getProgress(), 0.001);
+
+    // input and output files
+    File input1 = new File(workSpace + File.separator + "input1");
+    if (!input1.getParentFile().exists()) {
+      Assert.assertTrue(input1.getParentFile().mkdirs());
+    }
+
+    if (!input1.exists()) {
+      Assert.assertTrue(input1.createNewFile());
+    }
+
+    File input2 = new File(workSpace + File.separator + "input2");
+    if (!input2.exists()) {
+      Assert.assertTrue(input2.createNewFile());
+    }
+    // set data for splits
+    conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+        input1.getAbsolutePath() + "," + input2.getAbsolutePath());
+    InputSplit[] splits = inputFormat.getSplits(conf, 2);
+    assertEquals(2, splits.length);
+
+    PipesNonJavaInputFormat.PipesDummyRecordReader dummyRecordReader = new PipesNonJavaInputFormat.PipesDummyRecordReader(
+        conf, splits[0]);
+    // empty dummyRecordReader
+    assertNull(dummyRecordReader.createKey());
+    assertNull(dummyRecordReader.createValue());
+    assertEquals(0, dummyRecordReader.getPos());
+    assertEquals(0.0, dummyRecordReader.getProgress(), 0.001);
+     // test method next
+    assertTrue(dummyRecordReader.next(new FloatWritable(2.0f), NullWritable.get()));
+    assertEquals(2.0, dummyRecordReader.getProgress(), 0.001);
+    dummyRecordReader.close();
+  }
+
+}