Browse Source

HADOOP-6576. Fix streaming test failures on 0.20. Contributed by Todd Lipcon

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20@911707 13f79535-47bb-0310-9956-ffa450edef68
Christopher Douglas 15 years ago
parent
commit
6fc48c399c
17 changed files with 250 additions and 341 deletions
  1. 2 0
      CHANGES.txt
  2. 2 1
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
  3. 33 51
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java
  4. 64 68
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
  5. 1 10
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java
  6. 1 10
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamDataProtocol.java
  7. 1 10
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java
  8. 1 3
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java
  9. 11 21
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java
  10. 24 27
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
  11. 3 5
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java
  12. 38 28
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingExitStatus.java
  13. 1 10
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingKeyValue.java
  14. 1 10
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java
  15. 1 1
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStatus.java
  16. 12 20
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStderr.java
  17. 54 66
      src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java

+ 2 - 0
CHANGES.txt

@@ -63,6 +63,8 @@ Release 0.20.2 - 2010-2-18
     HADOOP-6575. Remove call to fault injection tests not present in 0.20.
     (cdouglas)
 
+    HADOOP-6576. Fix streaming test failures on 0.20. (Todd Lipcon via cdouglas)
+
   IMPROVEMENTS
 
     HADOOP-5611. Fix C++ libraries to build on Debian Lenny. (Todd Lipcon

+ 2 - 1
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java

@@ -30,11 +30,12 @@ public class TestGzipInput extends TestStreaming
 {
 
   public TestGzipInput() throws IOException {
-    INPUT_FILE = new File("input.txt.gz");
+    INPUT_FILE = new File(TEST_DIR, "input.txt.gz");
   }
   
   protected void createInput() throws IOException
   {
+    assertTrue("Creating " + TEST_DIR, TEST_DIR.mkdirs());
     GZIPOutputStream out = new GZIPOutputStream(
                                                 new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
     out.write(input.getBytes("UTF-8"));

+ 33 - 51
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleArchiveFiles.java

@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.DataOutputStream;
 import java.io.InputStreamReader;
 import java.io.BufferedReader;
+import java.util.Arrays;
 import java.util.zip.ZipEntry;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipOutputStream;
@@ -43,10 +44,11 @@ public class TestMultipleArchiveFiles extends TestStreaming
 {
 
   private StreamJob job;
-  private String INPUT_FILE = "input.txt";
-  private String CACHE_ARCHIVE_1 = "cacheArchive1.zip";
+  private String INPUT_DIR = "multiple-archive-files/";
+  private String INPUT_FILE = INPUT_DIR + "input.txt";
+  private String CACHE_ARCHIVE_1 = INPUT_DIR + "cacheArchive1.zip";
   private File CACHE_FILE_1 = null;
-  private String CACHE_ARCHIVE_2 = "cacheArchive2.zip";
+  private String CACHE_ARCHIVE_2 = INPUT_DIR + "cacheArchive2.zip";
   private File CACHE_FILE_2 = null;
   private String expectedOutput = null;
   private String OUTPUT_DIR = "out";
@@ -58,27 +60,23 @@ public class TestMultipleArchiveFiles extends TestStreaming
   private String strNamenode = null;
   private String namenode = null;
 
-  public TestMultipleArchiveFiles() throws IOException {
+  public TestMultipleArchiveFiles() throws Exception {
     CACHE_FILE_1 = new File("cacheArchive1");
     CACHE_FILE_2 = new File("cacheArchive2");
     input = "HADOOP";
     expectedOutput = "HADOOP\t\nHADOOP\t\n";
-    try {
-      conf = new Configuration();      
-      dfs = new MiniDFSCluster(conf, 1, true, null);      
-      fileSys = dfs.getFileSystem();
-      namenode = fileSys.getUri().getAuthority();
-      mr  = new MiniMRCluster(1, namenode, 3);
-      strJobTracker = "mapred.job.tracker=" + "localhost:" + mr.getJobTrackerPort();
-      strNamenode = "fs.default.name=" + namenode;
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
+    conf = new Configuration();      
+    dfs = new MiniDFSCluster(conf, 1, true, null);      
+    fileSys = dfs.getFileSystem();
+    namenode = fileSys.getUri().getAuthority();
+    mr  = new MiniMRCluster(1, namenode, 3);
+    strJobTracker = "mapred.job.tracker=" + "localhost:" + mr.getJobTrackerPort();
+    strNamenode = "fs.default.name=" + namenode;
   }
   
   protected void createInput() throws IOException
   {
-
+    fileSys.delete(new Path(INPUT_DIR), true);
     DataOutputStream dos = fileSys.create(new Path(INPUT_FILE));
     String inputFileString = "symlink1/cacheArchive1\nsymlink2/cacheArchive2";
     dos.write(inputFileString.getBytes("UTF-8"));
@@ -102,14 +100,9 @@ public class TestMultipleArchiveFiles extends TestStreaming
   }
 
   protected String[] genArgs() {
-    String cacheArchiveString1 = null;
-    String cacheArchiveString2 = null;
-    try {
-      cacheArchiveString1 = fileSys.getUri().toString()+fileSys.getWorkingDirectory().toString()+"/"+CACHE_ARCHIVE_1+"#symlink1";
-      cacheArchiveString2 = fileSys.getUri().toString()+fileSys.getWorkingDirectory().toString()+"/"+CACHE_ARCHIVE_2+"#symlink2";
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
+    String workDir = fileSys.getWorkingDirectory().toString() + "/";
+    String cache1 = workDir + CACHE_ARCHIVE_1 + "#symlink1";
+    String cache2 = workDir + CACHE_ARCHIVE_2 + "#symlink2";
 
     return new String[] {
       "-input", INPUT_FILE.toString(),
@@ -117,39 +110,28 @@ public class TestMultipleArchiveFiles extends TestStreaming
       "-mapper", "xargs cat", 
       "-reducer", "cat",
       "-jobconf", "mapred.reduce.tasks=1",
-      "-cacheArchive", cacheArchiveString1, 
-      "-cacheArchive", cacheArchiveString2,
+      "-cacheArchive", cache1, 
+      "-cacheArchive", cache2,
       "-jobconf", strNamenode,
       "-jobconf", strJobTracker,
       "-jobconf", "stream.tmpdir=" + System.getProperty("test.build.data","/tmp")
     };
   }
 
-  public void testCommandLine() {
-    try {
-      createInput();
-      job = new StreamJob(genArgs(), true);
-      if(job.go() != 0) {
-        throw new Exception("Job Failed");
-      }
-      StringBuffer output = new StringBuffer(256);
-      Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
-                                            new Path(OUTPUT_DIR)));
-      for (int i = 0; i < fileList.length; i++){
-        BufferedReader bread =
-          new BufferedReader(new InputStreamReader(fileSys.open(fileList[i])));
-        output.append(bread.readLine());
-        output.append("\n");
-        output.append(bread.readLine());
-        output.append("\n");
-      }
-      assertEquals(expectedOutput, output.toString());
-    } catch (Exception e) {
-      e.printStackTrace();
-    } finally {
-      CACHE_FILE_1.delete();
-      CACHE_FILE_2.delete();
-    }
+  public void testCommandLine() throws IOException {
+   createInput();
+   job = new StreamJob(genArgs(), true);
+   if(job.go() != 0) {
+     throw new IOException("Job Failed");
+   }
+   StringBuffer output = new StringBuffer(256);
+   Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
+                                         new Path(OUTPUT_DIR)));
+   for (int i = 0; i < fileList.length; i++){
+     if (fileList[i].getName().equals("_logs")) continue;
+     output.append(StreamUtil.slurpHadoop(fileList[i], fileSys));
+   }
+   assertEquals(expectedOutput, output.toString());
   }
 
   public static void main(String[]args) throws Exception

+ 64 - 68
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java

@@ -50,80 +50,76 @@ public class TestMultipleCachefiles extends TestCase
   {
   }
 
-  public void testMultipleCachefiles()
+  public void testMultipleCachefiles() throws Exception
   {
-    try {
-      boolean mayExit = false;
-      MiniMRCluster mr = null;
-      MiniDFSCluster dfs = null; 
-      try{
-        Configuration conf = new Configuration();
-        dfs = new MiniDFSCluster(conf, 1, true, null);
-        FileSystem fileSys = dfs.getFileSystem();
-        String namenode = fileSys.getName();
-        mr  = new MiniMRCluster(1, namenode, 3);
-        // During tests, the default Configuration will use a local mapred
-        // So don't specify -config or -cluster
-        String strJobtracker = "mapred.job.tracker=" + "localhost:" + mr.getJobTrackerPort();
-        String strNamenode = "fs.default.name=" + namenode;
-        String argv[] = new String[] {
-          "-input", INPUT_FILE,
-          "-output", OUTPUT_DIR,
-          "-mapper", map,
-          "-reducer", reduce,
-          //"-verbose",
-          //"-jobconf", "stream.debug=set"
-          "-jobconf", strNamenode,
-          "-jobconf", strJobtracker,
-          "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
-          "-jobconf", "mapred.child.java.opts=-Dcontrib.name=" + System.getProperty("contrib.name") + " " +
-                      "-Dbuild.test=" + System.getProperty("build.test") + " " +
-                      conf.get("mapred.child.java.opts",""),
-          "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE + "#" + mapString,
-          "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE_2 + "#" + mapString2
-        };
+    boolean mayExit = false;
+    MiniMRCluster mr = null;
+    MiniDFSCluster dfs = null; 
+    try{
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(conf, 1, true, null);
+      FileSystem fileSys = dfs.getFileSystem();
+      String namenode = fileSys.getName();
+      mr  = new MiniMRCluster(1, namenode, 3);
+      // During tests, the default Configuration will use a local mapred
+      // So don't specify -config or -cluster
+      String strJobtracker = "mapred.job.tracker=" + "localhost:" + mr.getJobTrackerPort();
+      String strNamenode = "fs.default.name=" + namenode;
+      String argv[] = new String[] {
+        "-input", INPUT_FILE,
+        "-output", OUTPUT_DIR,
+        "-mapper", map,
+        "-reducer", reduce,
+        //"-verbose",
+        //"-jobconf", "stream.debug=set"
+        "-jobconf", strNamenode,
+        "-jobconf", strJobtracker,
+        "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
+        "-jobconf", "mapred.child.java.opts=-Dcontrib.name=" + System.getProperty("contrib.name") + " " +
+                    "-Dbuild.test=" + System.getProperty("build.test") + " " +
+                    conf.get("mapred.child.java.opts",""),
+        "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE + "#" + mapString,
+        "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE_2 + "#" + mapString2
+      };
 
-        fileSys.delete(new Path(OUTPUT_DIR));
+      fileSys.delete(new Path(OUTPUT_DIR));
+      
+      DataOutputStream file = fileSys.create(new Path(INPUT_FILE));
+      file.writeBytes(mapString + "\n");
+      file.writeBytes(mapString2 + "\n");
+      file.close();
+      file = fileSys.create(new Path(CACHE_FILE));
+      file.writeBytes(cacheString);
+      file.close();
+      file = fileSys.create(new Path(CACHE_FILE_2));
+      file.writeBytes(cacheString2);
+      file.close();
         
-        DataOutputStream file = fileSys.create(new Path(INPUT_FILE));
-        file.writeBytes(mapString + "\n");
-        file.writeBytes(mapString2 + "\n");
-        file.close();
-        file = fileSys.create(new Path(CACHE_FILE));
-        file.writeBytes(cacheString);
-        file.close();
-        file = fileSys.create(new Path(CACHE_FILE_2));
-        file.writeBytes(cacheString2);
-        file.close();
-          
-        job = new StreamJob(argv, mayExit);     
-        job.go();
+      job = new StreamJob(argv, mayExit);     
+      job.go();
 
-	fileSys = dfs.getFileSystem();
-        String line = null;
-        String line2 = null;
-        Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
-                                     new Path(OUTPUT_DIR),
-                                     new OutputLogFilter()));
-        for (int i = 0; i < fileList.length; i++){
-          System.out.println(fileList[i].toString());
-          BufferedReader bread =
-            new BufferedReader(new InputStreamReader(fileSys.open(fileList[i])));
-          line = bread.readLine();
-          System.out.println(line);
-          line2 = bread.readLine();
-          System.out.println(line2);
-        }
-        assertEquals(cacheString + "\t", line);
-        assertEquals(cacheString2 + "\t", line2);
-      } finally{
-        if (dfs != null) { dfs.shutdown(); }
-        if (mr != null) { mr.shutdown();}
+      fileSys = dfs.getFileSystem();
+      String line = null;
+      String line2 = null;
+      Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
+                                   new Path(OUTPUT_DIR),
+                                   new OutputLogFilter()));
+      for (int i = 0; i < fileList.length; i++){
+        System.out.println(fileList[i].toString());
+        BufferedReader bread =
+          new BufferedReader(new InputStreamReader(fileSys.open(fileList[i])));
+        line = bread.readLine();
+        System.out.println(line);
+        line2 = bread.readLine();
+        System.out.println(line2);
       }
-      
-    } catch(Exception e) {
-      failTrace(e);
+      assertEquals(cacheString + "\t", line);
+      assertEquals(cacheString2 + "\t", line2);
+    } finally{
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown();}
     }
+    
   }
 
   void failTrace(Exception e)

+ 1 - 10
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamAggregate.java

@@ -70,7 +70,7 @@ public class TestStreamAggregate extends TestCase
     };
   }
   
-  public void testCommandLine()
+  public void testCommandLine() throws Exception
   {
     try {
       try {
@@ -91,8 +91,6 @@ public class TestStreamAggregate extends TestCase
       System.err.println("outEx1=" + outputExpect);
       System.err.println("  out1=" + output);
       assertEquals(outputExpect, output);
-    } catch(Exception e) {
-      failTrace(e);
     } finally {
       File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
       INPUT_FILE.delete();
@@ -101,13 +99,6 @@ public class TestStreamAggregate extends TestCase
     }
   }
 
-  private void failTrace(Exception e)
-  {
-    StringWriter sw = new StringWriter();
-    e.printStackTrace(new PrintWriter(sw));
-    fail(sw.toString());
-  }
-
   public static void main(String[]args) throws Exception
   {
     new TestStreaming().testCommandLine();

+ 1 - 10
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamDataProtocol.java

@@ -78,7 +78,7 @@ public class TestStreamDataProtocol extends TestCase
     };
   }
   
-  public void testCommandLine()
+  public void testCommandLine() throws Exception
   {
     try {
       try {
@@ -100,8 +100,6 @@ public class TestStreamDataProtocol extends TestCase
       System.err.println("  out1=" + output);
       System.err.println("  equals=" + outputExpect.compareTo(output));
       assertEquals(outputExpect, output);
-    } catch(Exception e) {
-      failTrace(e);
     } finally {
       File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
       INPUT_FILE.delete();
@@ -110,13 +108,6 @@ public class TestStreamDataProtocol extends TestCase
     }
   }
 
-  private void failTrace(Exception e)
-  {
-    StringWriter sw = new StringWriter();
-    e.printStackTrace(new PrintWriter(sw));
-    fail(sw.toString());
-  }
-
   public static void main(String[]args) throws Exception
   {
     new TestStreamDataProtocol().testCommandLine();

+ 1 - 10
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamReduceNone.java

@@ -69,7 +69,7 @@ public class TestStreamReduceNone extends TestCase
     };
   }
   
-  public void testCommandLine()
+  public void testCommandLine() throws Exception
   {
     String outFileName = "part-00000";
     File outFile = null;
@@ -91,8 +91,6 @@ public class TestStreamReduceNone extends TestCase
       System.err.println("outEx1=" + outputExpect);
       System.err.println("  out1=" + output);
       assertEquals(outputExpect, output);
-    } catch(Exception e) {
-      failTrace(e);
     } finally {
       outFile.delete();
       File outFileCRC = new File(OUTPUT_DIR, "."+outFileName+".crc").getAbsoluteFile();
@@ -102,13 +100,6 @@ public class TestStreamReduceNone extends TestCase
     }
   }
 
-  private void failTrace(Exception e)
-  {
-    StringWriter sw = new StringWriter();
-    e.printStackTrace(new PrintWriter(sw));
-    fail(sw.toString());
-  }
-
   public static void main(String[]args) throws Exception
   {
     new TestStreamReduceNone().testCommandLine();

+ 1 - 3
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlRecordReader.java

@@ -58,7 +58,7 @@ public class TestStreamXmlRecordReader extends TestStreaming
     };
   }
 
-  public void testCommandLine() {
+  public void testCommandLine() throws IOException {
     try {
       try {
         OUTPUT_DIR.getAbsoluteFile().delete();
@@ -71,8 +71,6 @@ public class TestStreamXmlRecordReader extends TestStreaming
       String output = StreamUtil.slurp(outFile);
       outFile.delete();
       assertEquals(input, output);
-    } catch (Exception e) {
-      e.printStackTrace();
     } finally {
       INPUT_FILE.delete();
       File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();

+ 11 - 21
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java

@@ -104,24 +104,16 @@ public class TestStreamedMerge extends TestCase {
     return c;
   }
 
-  void lsr() {
-    try {
-      System.out.println("lsr /");
-      ToolRunner.run(conf_, new FsShell(), new String[]{ "-lsr", "/" });
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
+  void lsr() throws Exception  {
+    System.out.println("lsr /");
+    ToolRunner.run(conf_, new FsShell(), new String[]{ "-lsr", "/" });
   }
 
-  void printSampleInput() {
-    try {
-      System.out.println("cat /input/part-00");
-      String content = StreamUtil.slurpHadoop(new Path("/input/part-00"), fs_);
-      System.out.println(content);
-      System.out.println("cat done.");
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
+  void printSampleInput() throws IOException {
+    System.out.println("cat /input/part-00");
+    String content = StreamUtil.slurpHadoop(new Path("/input/part-00"), fs_);
+    System.out.println(content);
+    System.out.println("cat done.");
   }
 
   void callStreaming(String argSideOutput, boolean inputTagged) throws IOException {
@@ -210,7 +202,7 @@ public class TestStreamedMerge extends TestCase {
     StringBuffer buf_;
   }
 
-  public void testMain() throws IOException {
+  public void testMain() throws Exception {
     boolean success = false;
     String base = new File(".").getAbsolutePath();
     System.setProperty("hadoop.log.dir", base + "/logs");
@@ -228,8 +220,6 @@ public class TestStreamedMerge extends TestCase {
       }
       doAllTestJobs();
       success = true;
-    } catch (IOException io) {
-      io.printStackTrace();
     } finally {
       try {
         fs_.close();
@@ -243,14 +233,14 @@ public class TestStreamedMerge extends TestCase {
     }
   }
 
-  void doAllTestJobs() throws IOException
+  void doAllTestJobs() throws Exception 
   {
     goSocketTagged(true, false);
     goSocketTagged(false, false);
     goSocketTagged(true, true);
   }
   
-  void goSocketTagged(boolean socket, boolean inputTagged) throws IOException {
+  void goSocketTagged(boolean socket, boolean inputTagged) throws Exception {
     System.out.println("***** goSocketTagged: " + socket + ", " + inputTagged);
     String expect = createInputs(inputTagged);
     lsr();

+ 24 - 27
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java

@@ -29,8 +29,9 @@ public class TestStreaming extends TestCase
 
   // "map" command: grep -E (red|green|blue)
   // reduce command: uniq
-  protected File INPUT_FILE = new File("input.txt");
-  protected File OUTPUT_DIR = new File("out");
+  protected File TEST_DIR;
+  protected File INPUT_FILE;
+  protected File OUTPUT_DIR;
   protected String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
   // map behaves like "/usr/bin/tr . \\n"; (split words into lines)
   protected String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{".", "\\n"});
@@ -46,10 +47,16 @@ public class TestStreaming extends TestCase
     UtilTest utilTest = new UtilTest(getClass().getName());
     utilTest.checkUserDir();
     utilTest.redirectIfAntJunit();
+    TEST_DIR = new File(getClass().getName()).getAbsoluteFile();
+    OUTPUT_DIR = new File(TEST_DIR, "out");
+    INPUT_FILE = new File(TEST_DIR, "input.txt");
   }
 
   protected void createInput() throws IOException
   {
+    if (!TEST_DIR.exists()) {
+      assertTrue("Creating " + TEST_DIR, TEST_DIR.mkdirs());
+    }
     DataOutputStream out = new DataOutputStream(
                                                 new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
     out.write(input.getBytes("UTF-8"));
@@ -69,33 +76,23 @@ public class TestStreaming extends TestCase
     };
   }
   
-  public void testCommandLine() throws IOException
+  public void testCommandLine() throws Exception
   {
-    try {
-      try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
-      } catch (Exception e) {
-      }
-
-      createInput();
-      boolean mayExit = false;
+    UtilTest.recursiveDelete(TEST_DIR);
 
-      // During tests, the default Configuration will use a local mapred
-      // So don't specify -config or -cluster
-      job = new StreamJob(genArgs(), mayExit);      
-      job.go();
-      File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
-      String output = StreamUtil.slurp(outFile);
-      outFile.delete();
-      System.err.println("outEx1=" + outputExpect);
-      System.err.println("  out1=" + output);
-      assertEquals(outputExpect, output);
-    } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
-      INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
-    }
+    createInput();
+    boolean mayExit = false;
+ 
+    // During tests, the default Configuration will use a local mapred
+    // So don't specify -config or -cluster
+    job = new StreamJob(genArgs(), mayExit);      
+    job.go();
+    File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
+    String output = StreamUtil.slurp(outFile);
+    outFile.delete();
+    System.err.println("outEx1=" + outputExpect);
+    System.err.println("  out1=" + output);
+    assertEquals(outputExpect, output);
   }
 
   public static void main(String[]args) throws Exception

+ 3 - 5
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingCounters.java

@@ -20,7 +20,7 @@ package org.apache.hadoop.streaming;
 
 import java.io.File;
 import java.io.IOException;
-
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.Counters.Group;
@@ -38,7 +38,7 @@ public class TestStreamingCounters extends TestStreaming {
   {
     try {
       try {
-        OUTPUT_DIR.getAbsoluteFile().delete();
+        FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
       } catch (Exception e) {
       }
 
@@ -62,10 +62,8 @@ public class TestStreamingCounters extends TestStreaming {
       assertNotNull("Counter", counter);
       assertEquals(3, counter.getCounter());
     } finally {
-      File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
       INPUT_FILE.delete();
-      outFileCRC.delete();
-      OUTPUT_DIR.getAbsoluteFile().delete();
+      FileUtil.fullyDelete(OUTPUT_DIR.getAbsoluteFile());
     }
   }
   

+ 38 - 28
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingExitStatus.java

@@ -24,6 +24,7 @@ import java.util.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * This class tests if hadoopStreaming fails a job when the mapper or
@@ -32,8 +33,11 @@ import org.apache.hadoop.fs.Path;
  */
 public class TestStreamingExitStatus extends TestCase
 {
-  protected File INPUT_FILE = new File("input.txt");
-  protected File OUTPUT_DIR = new File("out");  
+  protected File TEST_DIR =
+    new File("TestStreamingExitStatus").getAbsoluteFile();
+
+  protected File INPUT_FILE = new File(TEST_DIR, "input.txt");
+  protected File OUTPUT_DIR = new File(TEST_DIR, "out");  
 
   protected String failingTask = StreamUtil.makeJavaCommand(FailApp.class, new String[]{"true"});
   protected String echoTask = StreamUtil.makeJavaCommand(FailApp.class, new String[]{"false"});
@@ -41,7 +45,6 @@ public class TestStreamingExitStatus extends TestCase
   public TestStreamingExitStatus() throws IOException {
     UtilTest utilTest = new UtilTest(getClass().getName());
     utilTest.checkUserDir();
-    utilTest.redirectIfAntJunit();
   }
 
   protected String[] genArgs(boolean exitStatusIsFailure, boolean failMap) {
@@ -57,51 +60,58 @@ public class TestStreamingExitStatus extends TestCase
   }
 
   public void setUp() throws IOException {
-    UtilTest.recursiveDelete(INPUT_FILE);
-    UtilTest.recursiveDelete(OUTPUT_DIR);
-    
+    UtilTest.recursiveDelete(TEST_DIR);
+    assertTrue(TEST_DIR.mkdirs());
+
     FileOutputStream out = new FileOutputStream(INPUT_FILE.getAbsoluteFile());
     out.write("hello\n".getBytes());
     out.close();
   }
 
-  public void runStreamJob(boolean exitStatusIsFailure, boolean failMap) {
-    try {
-      boolean mayExit = false;
-      int returnStatus = 0;
-
-      StreamJob job = new StreamJob(genArgs(exitStatusIsFailure, failMap), mayExit);
-      returnStatus = job.go();
-      
-      if (exitStatusIsFailure) {
-        assertEquals("Streaming Job failure code expected", /*job not successful:*/1, returnStatus);
+  private static String join(CharSequence separator, Iterable<String> strings) {
+    StringBuilder sb = new StringBuilder();
+    boolean first = true;
+    for (String s : strings) {
+      if (first) {
+        first = false;
       } else {
-        assertEquals("Streaming Job expected to succeed", 0, returnStatus);
+        sb.append(separator);
       }
-    } catch (Exception e) {
-      failTrace(e);
+      sb.append(s);
+    }
+    return sb.toString();
+  }
+
+  public void runStreamJob(boolean exitStatusIsFailure, boolean failMap) throws Exception {
+    boolean mayExit = false;
+    int returnStatus = 0;
+    String args[] = genArgs(exitStatusIsFailure, failMap);
+    System.err.println("Testing streaming command line:\n" +
+               join(" ", Arrays.asList(args)));
+    StreamJob job = new StreamJob(genArgs(exitStatusIsFailure, failMap), mayExit);
+    returnStatus = job.go();
+    
+    if (exitStatusIsFailure) {
+      assertEquals("Streaming Job failure code expected", /*job not successful:*/1, returnStatus);
+    } else {
+      assertEquals("Streaming Job expected to succeed", 0, returnStatus);
     }
   }
   
-  public void testMapFailOk() {
+  public void testMapFailOk() throws Exception {
     runStreamJob(false, true);
   }
   
-  public void testMapFailNotOk() {
+  public void testMapFailNotOk() throws Exception {
     runStreamJob(true, true);
   }
   
-  public void testReduceFailOk() {
+  public void testReduceFailOk() throws Exception {
     runStreamJob(false, false);
   }
   
-  public void testReduceFailNotOk() {
+  public void testReduceFailNotOk() throws Exception {
     runStreamJob(true, false);
   }  
   
-  protected void failTrace(Exception e) {
-    StringWriter sw = new StringWriter();
-    e.printStackTrace(new PrintWriter(sw));
-    fail(sw.toString());
-  }
 }

+ 1 - 10
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingKeyValue.java

@@ -72,7 +72,7 @@ public class TestStreamingKeyValue extends TestCase
     };
   }
   
-  public void testCommandLine()
+  public void testCommandLine() throws Exception
   {
     String outFileName = "part-00000";
     File outFile = null;
@@ -94,8 +94,6 @@ public class TestStreamingKeyValue extends TestCase
       System.err.println("outEx1=" + outputExpect);
       System.err.println("  out1=" + output);
       assertEquals(outputExpect, output);
-    } catch(Exception e) {
-      failTrace(e);
     } finally {
       outFile.delete();
       File outFileCRC = new File(OUTPUT_DIR,
@@ -106,13 +104,6 @@ public class TestStreamingKeyValue extends TestCase
     }
   }
 
-  private void failTrace(Exception e)
-  {
-    StringWriter sw = new StringWriter();
-    e.printStackTrace(new PrintWriter(sw));
-    fail(sw.toString());
-  }
-
   public static void main(String[]args) throws Exception
   {
     new TestStreamingKeyValue().testCommandLine();

+ 1 - 10
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java

@@ -85,7 +85,7 @@ public class TestStreamingSeparator extends TestCase
     };
   }
   
-  public void testCommandLine()
+  public void testCommandLine() throws Exception
   {
     try {
       try {
@@ -106,8 +106,6 @@ public class TestStreamingSeparator extends TestCase
       System.err.println("outEx1=" + outputExpect);
       System.err.println("  out1=" + output);
       assertEquals(outputExpect, output);
-    } catch(Exception e) {
-      failTrace(e);
     } finally {
       File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
       INPUT_FILE.delete();
@@ -116,13 +114,6 @@ public class TestStreamingSeparator extends TestCase
     }
   }
 
-  private void failTrace(Exception e)
-  {
-    StringWriter sw = new StringWriter();
-    e.printStackTrace(new PrintWriter(sw));
-    fail(sw.toString());
-  }
-
   public static void main(String[]args) throws Exception
   {
     new TestStreamingSeparator().testCommandLine();

+ 1 - 1
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStatus.java

@@ -92,7 +92,7 @@ public class TestStreamingStatus extends TestCase {
 
       TaskReport[] reports = job.jc_.getMapTaskReports(job.jobId_);
       assertEquals(1, reports.length);
-      assertEquals("starting echo > sort", reports[0].getState());
+      assertEquals("starting echo", reports[0].getState());
     } finally {
       if (fs != null) { clean(fs); }
       if (mr != null) { mr.shutdown(); }

+ 12 - 20
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingStderr.java

@@ -71,43 +71,35 @@ public class TestStreamingStderr extends TestCase
   }
 
   public void runStreamJob(String baseName, boolean hasInput,
-                           int preLines, int duringLines, int postLines) {
-    try {
-      File input = setupInput(baseName, hasInput);
-      File output = setupOutput(baseName);
-      boolean mayExit = false;
-      int returnStatus = 0;
+                           int preLines, int duringLines, int postLines)
+    throws Exception {
+    File input = setupInput(baseName, hasInput);
+    File output = setupOutput(baseName);
+    boolean mayExit = false;
+    int returnStatus = 0;
 
-      StreamJob job = new StreamJob(genArgs(input, output, preLines, duringLines, postLines), mayExit);
-      returnStatus = job.go();
-      assertEquals("StreamJob success", 0, returnStatus);
-    } catch (Exception e) {
-      failTrace(e);
-    }
+    StreamJob job = new StreamJob(genArgs(input, output, preLines, duringLines, postLines), mayExit);
+    returnStatus = job.go();
+    assertEquals("StreamJob success", 0, returnStatus);
   }
 
   // This test will fail by blocking forever if the stderr isn't
   // consumed by Hadoop for tasks that don't have any input.
-  public void testStderrNoInput() throws IOException {
+  public void testStderrNoInput() throws Exception {
     runStreamJob("stderr-pre", false, 10000, 0, 0);
   }
 
   // Streaming should continue to read stderr even after all input has
   // been consumed.
-  public void testStderrAfterOutput() throws IOException {
+  public void testStderrAfterOutput() throws Exception {
     runStreamJob("stderr-post", false, 0, 0, 10000);
   }
 
   // This test should produce a task timeout if stderr lines aren't
   // counted as progress. This won't actually work until
   // LocalJobRunner supports timeouts.
-  public void testStderrCountsAsProgress() throws IOException {
+  public void testStderrCountsAsProgress() throws Exception {
     runStreamJob("stderr-progress", true, 10, 1000, 0);
   }
   
-  protected void failTrace(Exception e) {
-    StringWriter sw = new StringWriter();
-    e.printStackTrace(new PrintWriter(sw));
-    fail(sw.toString());
-  }
 }

+ 54 - 66
src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java

@@ -47,80 +47,68 @@ public class TestSymLink extends TestCase
   {
   }
 
-  public void testSymLink()
+  public void testSymLink() throws Exception
   {
-    try {
-      boolean mayExit = false;
-      MiniMRCluster mr = null;
-      MiniDFSCluster dfs = null; 
-      try{
-        Configuration conf = new Configuration();
-        dfs = new MiniDFSCluster(conf, 1, true, null);
-        FileSystem fileSys = dfs.getFileSystem();
-        String namenode = fileSys.getName();
-        mr  = new MiniMRCluster(1, namenode, 3);
-        // During tests, the default Configuration will use a local mapred
-        // So don't specify -config or -cluster
-        String strJobtracker = "mapred.job.tracker=" + "localhost:" + mr.getJobTrackerPort();
-        String strNamenode = "fs.default.name=" + namenode;
-        String argv[] = new String[] {
-          "-input", INPUT_FILE,
-          "-output", OUTPUT_DIR,
-          "-mapper", map,
-          "-reducer", reduce,
-          //"-verbose",
-          //"-jobconf", "stream.debug=set"
-          "-jobconf", strNamenode,
-          "-jobconf", strJobtracker,
-          "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
-          "-jobconf", "mapred.child.java.opts=-Dcontrib.name=" + System.getProperty("contrib.name") + " " +
-                      "-Dbuild.test=" + System.getProperty("build.test") + " " +
-                      conf.get("mapred.child.java.opts",""),
-          "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE + "#testlink"
-        };
+    boolean mayExit = false;
+    MiniMRCluster mr = null;
+    MiniDFSCluster dfs = null; 
+    try{
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(conf, 1, true, null);
+      FileSystem fileSys = dfs.getFileSystem();
+      String namenode = fileSys.getName();
+      mr  = new MiniMRCluster(1, namenode, 3);
+      // During tests, the default Configuration will use a local mapred
+      // So don't specify -config or -cluster
+      String strJobtracker = "mapred.job.tracker=" + "localhost:" + mr.getJobTrackerPort();
+      String strNamenode = "fs.default.name=" + namenode;
+      String argv[] = new String[] {
+        "-input", INPUT_FILE,
+        "-output", OUTPUT_DIR,
+        "-mapper", map,
+        "-reducer", reduce,
+        //"-verbose",
+        //"-jobconf", "stream.debug=set"
+        "-jobconf", strNamenode,
+        "-jobconf", strJobtracker,
+        "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
+        "-jobconf", "mapred.child.java.opts=-Dcontrib.name=" + System.getProperty("contrib.name") + " " +
+                    "-Dbuild.test=" + System.getProperty("build.test") + " " +
+                    conf.get("mapred.child.java.opts",""),
+        "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE + "#testlink"
+      };
 
-        fileSys.delete(new Path(OUTPUT_DIR));
+      fileSys.delete(new Path(OUTPUT_DIR));
+      
+      DataOutputStream file = fileSys.create(new Path(INPUT_FILE));
+      file.writeBytes(mapString);
+      file.close();
+      file = fileSys.create(new Path(CACHE_FILE));
+      file.writeBytes(cacheString);
+      file.close();
         
-        DataOutputStream file = fileSys.create(new Path(INPUT_FILE));
-        file.writeBytes(mapString);
-        file.close();
-        file = fileSys.create(new Path(CACHE_FILE));
-        file.writeBytes(cacheString);
-        file.close();
-          
-        job = new StreamJob(argv, mayExit);      
-        job.go();
+      job = new StreamJob(argv, mayExit);      
+      job.go();
 
-        fileSys = dfs.getFileSystem();
-        String line = null;
-        Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
-                                                new Path(OUTPUT_DIR),
-                                                new OutputLogFilter()));
-        for (int i = 0; i < fileList.length; i++){
-          System.out.println(fileList[i].toString());
-          BufferedReader bread =
-            new BufferedReader(new InputStreamReader(fileSys.open(fileList[i])));
-          line = bread.readLine();
-          System.out.println(line);
-        }
-        assertEquals(cacheString + "\t", line);
-      } finally{
-        if (dfs != null) { dfs.shutdown(); }
-        if (mr != null) { mr.shutdown();}
+      fileSys = dfs.getFileSystem();
+      String line = null;
+      Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
+                                              new Path(OUTPUT_DIR),
+                                              new OutputLogFilter()));
+      for (int i = 0; i < fileList.length; i++){
+        System.out.println(fileList[i].toString());
+        BufferedReader bread =
+          new BufferedReader(new InputStreamReader(fileSys.open(fileList[i])));
+        line = bread.readLine();
+        System.out.println(line);
       }
-      
-    } catch(Exception e) {
-      failTrace(e);
+      assertEquals(cacheString + "\t", line);
+    } finally{
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown();}
     }
   }
 
-  void failTrace(Exception e)
-  {
-    StringWriter sw = new StringWriter();
-    e.printStackTrace(new PrintWriter(sw));
-    fail(sw.toString());
-  }
-
   public static void main(String[]args) throws Exception
   {
     new TestStreaming().testCommandLine();