Sfoglia il codice sorgente

Initial commit of code copied from Nutch.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@374735 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 anni fa
parent
commit
e87470e3f8

+ 418 - 0
src/test/org/apache/hadoop/fs/TestNutchFileSystem.java

@@ -0,0 +1,418 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+import java.util.logging.*;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.lib.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.conf.*;
+
+public class TestNutchFileSystem extends TestCase {
+  private static final Logger LOG = InputFormatBase.LOG;
+
+  private static Configuration conf = new Configuration();
+  private static int BUFFER_SIZE = conf.getInt("io.file.buffer.size", 4096);
+
+  private static final long MEGA = 1024 * 1024;
+  private static final int SEEKS_PER_FILE = 4;
+
+  private static String ROOT = System.getProperty("test.build.data","fs_test");
+  private static File CONTROL_DIR = new File(ROOT, "fs_control");
+  private static File WRITE_DIR = new File(ROOT, "fs_write");
+  private static File READ_DIR = new File(ROOT, "fs_read");
+  private static File DATA_DIR = new File(ROOT, "fs_data");
+
+  public void testFs() throws Exception {
+    testFs(10 * MEGA, 100, 0);
+  }
+
+  public static void testFs(long megaBytes, int numFiles, long seed)
+    throws Exception {
+
+    NutchFileSystem fs = NutchFileSystem.get(conf);
+
+    if (seed == 0)
+      seed = new Random().nextLong();
+
+    LOG.info("seed = "+seed);
+
+    createControlFile(fs, megaBytes, numFiles, seed);
+    writeTest(fs, false);
+    readTest(fs, false);
+    seekTest(fs, false);
+  }
+
+  public static void createControlFile(NutchFileSystem fs,
+                                       long megaBytes, int numFiles,
+                                       long seed) throws Exception {
+
+    LOG.info("creating control file: "+megaBytes+" bytes, "+numFiles+" files");
+
+    File controlFile = new File(CONTROL_DIR, "files");
+    fs.delete(controlFile);
+    Random random = new Random(seed);
+
+    SequenceFile.Writer writer =
+      new SequenceFile.Writer(fs, controlFile.toString(),
+                              UTF8.class, LongWritable.class);
+
+    long totalSize = 0;
+    long maxSize = ((megaBytes / numFiles) * 2) + 1;
+    try {
+      while (totalSize < megaBytes) {
+        UTF8 name = new UTF8(Long.toString(random.nextLong()));
+
+        long size = random.nextLong();
+        if (size < 0)
+          size = -size;
+        size = size % maxSize;
+
+        //LOG.info(" adding: name="+name+" size="+size);
+
+        writer.append(name, new LongWritable(size));
+
+        totalSize += size;
+      }
+    } finally {
+      writer.close();
+    }
+    LOG.info("created control file for: "+totalSize+" bytes");
+  }
+
+  public static class WriteMapper extends Configured implements Mapper {
+    private Random random = new Random();
+    private byte[] buffer = new byte[BUFFER_SIZE];
+    private NutchFileSystem fs;
+    private boolean fastCheck;
+    
+    {
+      try {
+        fs = NutchFileSystem.get(conf);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public WriteMapper() { super(null); }
+    
+    public WriteMapper(Configuration conf) { super(conf); }
+
+    public void configure(JobConf job) {
+      setConf(job);
+      fastCheck = job.getBoolean("fs.test.fastCheck", false);
+    }
+
+    public void map(WritableComparable key, Writable value,
+                    OutputCollector collector, Reporter reporter)
+      throws IOException {
+      String name = ((UTF8)key).toString();
+      long size = ((LongWritable)value).get();
+      long seed = Long.parseLong(name);
+
+      random.setSeed(seed);
+      reporter.setStatus("creating " + name);
+
+      OutputStream out = fs.create(new File(DATA_DIR, name));
+
+      long written = 0;
+      try {
+        while (written < size) {
+          if (fastCheck) {
+            Arrays.fill(buffer, (byte)random.nextInt(Byte.MAX_VALUE));
+          } else {
+            random.nextBytes(buffer);
+          }
+          long remains = size - written;
+          int length = (remains<=buffer.length) ? (int)remains : buffer.length;
+          out.write(buffer, 0, length);
+          written += length;
+          reporter.setStatus("writing "+name+"@"+written+"/"+size);
+        }
+      } finally {
+        out.close();
+      }
+
+      collector.collect(new UTF8("bytes"), new LongWritable(written));
+
+      reporter.setStatus("wrote " + name);
+    }
+  }
+
+  public static void writeTest(NutchFileSystem fs, boolean fastCheck)
+    throws Exception {
+
+    fs.delete(DATA_DIR);
+    fs.delete(WRITE_DIR);
+    
+    JobConf job = new JobConf(conf);
+    job.setBoolean("fs.test.fastCheck", fastCheck);
+
+    job.setInputDir(CONTROL_DIR);
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputKeyClass(UTF8.class);
+    job.setInputValueClass(LongWritable.class);
+
+    job.setMapperClass(WriteMapper.class);
+    job.setReducerClass(LongSumReducer.class);
+
+    job.setOutputDir(WRITE_DIR);
+    job.setOutputKeyClass(UTF8.class);
+    job.setOutputValueClass(LongWritable.class);
+    job.setNumReduceTasks(1);
+    JobClient.runJob(job);
+  }
+
+  public static class ReadMapper extends Configured implements Mapper {
+    private Random random = new Random();
+    private byte[] buffer = new byte[BUFFER_SIZE];
+    private byte[] check  = new byte[BUFFER_SIZE];
+    private NutchFileSystem fs;
+    private boolean fastCheck;
+
+    {
+      try {
+        fs = NutchFileSystem.get(conf);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public ReadMapper() { super(null); }
+    
+    public ReadMapper(Configuration conf) { super(conf); }
+
+    public void configure(JobConf job) {
+      setConf(job);
+      fastCheck = job.getBoolean("fs.test.fastCheck", false);
+    }
+
+    public void map(WritableComparable key, Writable value,
+                    OutputCollector collector, Reporter reporter)
+      throws IOException {
+      String name = ((UTF8)key).toString();
+      long size = ((LongWritable)value).get();
+      long seed = Long.parseLong(name);
+
+      random.setSeed(seed);
+      reporter.setStatus("opening " + name);
+
+      DataInputStream in =
+        new DataInputStream(fs.open(new File(DATA_DIR, name)));
+
+      long read = 0;
+      try {
+        while (read < size) {
+          long remains = size - read;
+          int n = (remains<=buffer.length) ? (int)remains : buffer.length;
+          in.readFully(buffer, 0, n);
+          read += n;
+          if (fastCheck) {
+            Arrays.fill(check, (byte)random.nextInt(Byte.MAX_VALUE));
+          } else {
+            random.nextBytes(check);
+          }
+          if (n != buffer.length) {
+            Arrays.fill(buffer, n, buffer.length, (byte)0);
+            Arrays.fill(check, n, check.length, (byte)0);
+          }
+          assertTrue(Arrays.equals(buffer, check));
+
+          reporter.setStatus("reading "+name+"@"+read+"/"+size);
+
+        }
+      } finally {
+        in.close();
+      }
+
+      collector.collect(new UTF8("bytes"), new LongWritable(read));
+
+      reporter.setStatus("read " + name);
+    }
+  }
+
+  public static void readTest(NutchFileSystem fs, boolean fastCheck)
+    throws Exception {
+
+    fs.delete(READ_DIR);
+
+    JobConf job = new JobConf(conf);
+    job.setBoolean("fs.test.fastCheck", fastCheck);
+
+
+    job.setInputDir(CONTROL_DIR);
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputKeyClass(UTF8.class);
+    job.setInputValueClass(LongWritable.class);
+
+    job.setMapperClass(ReadMapper.class);
+    job.setReducerClass(LongSumReducer.class);
+
+    job.setOutputDir(READ_DIR);
+    job.setOutputKeyClass(UTF8.class);
+    job.setOutputValueClass(LongWritable.class);
+    job.setNumReduceTasks(1);
+    JobClient.runJob(job);
+  }
+
+
+  public static class SeekMapper extends Configured implements Mapper {
+    private Random random = new Random();
+    private byte[] check  = new byte[BUFFER_SIZE];
+    private NutchFileSystem fs;
+    private boolean fastCheck;
+
+    {
+      try {
+        fs = NutchFileSystem.get(conf);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public SeekMapper() { super(null); }
+    
+    public SeekMapper(Configuration conf) { super(conf); }
+
+    public void configure(JobConf job) {
+      setConf(job);
+      fastCheck = job.getBoolean("fs.test.fastCheck", false);
+    }
+
+    public void map(WritableComparable key, Writable value,
+                    OutputCollector collector, Reporter reporter)
+      throws IOException {
+      String name = ((UTF8)key).toString();
+      long size = ((LongWritable)value).get();
+      long seed = Long.parseLong(name);
+
+      if (size == 0) return;
+
+      reporter.setStatus("opening " + name);
+
+      NFSDataInputStream in = fs.open(new File(DATA_DIR, name));
+        
+      try {
+        for (int i = 0; i < SEEKS_PER_FILE; i++) {
+          // generate a random position
+          long position = Math.abs(random.nextLong()) % size;
+          
+          // seek file to that position
+          reporter.setStatus("seeking " + name);
+          in.seek(position);
+          byte b = in.readByte();
+          
+          // check that byte matches
+          byte checkByte = 0;
+          // advance random state to that position
+          random.setSeed(seed);
+          for (int p = 0; p <= position; p+= check.length) {
+            reporter.setStatus("generating data for " + name);
+            if (fastCheck) {
+              checkByte = (byte)random.nextInt(Byte.MAX_VALUE);
+            } else {
+              random.nextBytes(check);
+              checkByte = check[(int)(position % check.length)];
+            }
+          }
+          assertEquals(b, checkByte);
+        }
+      } finally {
+        in.close();
+      }
+    }
+  }
+
+  public static void seekTest(NutchFileSystem fs, boolean fastCheck)
+    throws Exception {
+
+    fs.delete(READ_DIR);
+
+    JobConf job = new JobConf(conf);
+    job.setBoolean("fs.test.fastCheck", fastCheck);
+
+    job.setInputDir(CONTROL_DIR);
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputKeyClass(UTF8.class);
+    job.setInputValueClass(LongWritable.class);
+
+    job.setMapperClass(SeekMapper.class);
+    job.setReducerClass(LongSumReducer.class);
+
+    job.setOutputDir(READ_DIR);
+    job.setOutputKeyClass(UTF8.class);
+    job.setOutputValueClass(LongWritable.class);
+    job.setNumReduceTasks(1);
+    JobClient.runJob(job);
+  }
+
+
+  public static void main(String[] args) throws Exception {
+    int megaBytes = 10;
+    int files = 100;
+    boolean noRead = false;
+    boolean noWrite = false;
+    boolean noSeek = false;
+    boolean fastCheck = false;
+    long seed = new Random().nextLong();
+
+    String usage = "Usage: TestNutchFileSystem -files N -megaBytes M [-noread] [-nowrite] [-noseek] [-fastcheck]";
+    
+    if (args.length == 0) {
+        System.err.println(usage);
+        System.exit(-1);
+    }
+    for (int i = 0; i < args.length; i++) {       // parse command line
+      if (args[i].equals("-files")) {
+        files = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-megaBytes")) {
+        megaBytes = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-noread")) {
+        noRead = true;
+      } else if (args[i].equals("-nowrite")) {
+        noWrite = true;
+      } else if (args[i].equals("-noseek")) {
+        noSeek = true;
+      } else if (args[i].equals("-fastcheck")) {
+        fastCheck = true;
+      }
+    }
+
+    LOG.info("seed = "+seed);
+    LOG.info("files = " + files);
+    LOG.info("megaBytes = " + megaBytes);
+  
+    NutchFileSystem fs = NutchFileSystem.get(conf);
+
+    if (!noWrite) {
+      createControlFile(fs, megaBytes*MEGA, files, seed);
+      writeTest(fs, fastCheck);
+    }
+    if (!noRead) {
+      readTest(fs, fastCheck);
+    }
+    if (!noSeek) {
+      seekTest(fs, fastCheck);
+    }
+  }
+
+}

+ 102 - 0
src/test/org/apache/hadoop/io/RandomDatum.java

@@ -0,0 +1,102 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.
+
+import java.util.*;
+import java.io.*;
+
+public class RandomDatum implements WritableComparable {
+  private int length;
+  private byte[] data;
+
+  public RandomDatum() {}
+
+  public RandomDatum(Random random) {
+    length = 10 + random.nextInt(100);
+    data = new byte[length];
+    random.nextBytes(data);
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(length);
+    out.write(data);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    length = in.readInt();
+    if (data == null || length > data.length)
+      data = new byte[length];
+    in.readFully(data, 0, length);
+  }
+
+  public int compareTo(Object o) {
+    RandomDatum that = (RandomDatum)o;
+    return WritableComparator.compareBytes(this.data, 0, this.length,
+                                           that.data, 0, that.length);
+  }
+
+  public boolean equals(Object o) {
+    return compareTo(o) == 0;
+  }
+
+  private static final char[] HEX_DIGITS =
+  {'0','1','2','3','4','5','6','7','8','9','a','b','c','d','e','f'};
+
+  /** Returns a string representation of this object. */
+  public String toString() {
+    StringBuffer buf = new StringBuffer(length*2);
+    for (int i = 0; i < length; i++) {
+      int b = data[i];
+      buf.append(HEX_DIGITS[(b >> 4) & 0xf]);
+      buf.append(HEX_DIGITS[b & 0xf]);
+    }
+    return buf.toString();
+  }
+
+  public static class Generator {
+    Random random;
+
+    private RandomDatum key;
+    private RandomDatum value;
+    
+    public Generator() { random = new Random(); }
+    public Generator(int seed) { random = new Random(seed); }
+
+    public RandomDatum getKey() { return key; }
+    public RandomDatum getValue() { return value; }
+
+    public void next() {
+      key = new RandomDatum(random);
+      value = new RandomDatum(random);
+    }
+  }
+
+  /** A WritableComparator optimized for RandomDatum. */
+  public static class Comparator extends WritableComparator {
+    public Comparator() {
+      super(RandomDatum.class);
+    }
+
+    public int compare(byte[] b1, int s1, int l1,
+                       byte[] b2, int s2, int l2) {
+      int n1 = readInt(b1, s1);
+      int n2 = readInt(b2, s2);
+      return compareBytes(b1, s1+4, n1, b2, s2+4, n2);
+    }
+  }
+
+}

+ 150 - 0
src/test/org/apache/hadoop/io/TestArrayFile.java

@@ -0,0 +1,150 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+import java.util.logging.*;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.*;
+
+/** Support for flat files of binary key/value pairs. */
+public class TestArrayFile extends TestCase {
+  private static Logger LOG = SequenceFile.LOG;
+  private static String FILE =
+    System.getProperty("test.build.data",".") + "/test.array";
+
+  public TestArrayFile(String name) { 
+      super(name); 
+  }
+
+  public void testArrayFile() throws Exception {
+      Configuration conf = new Configuration();
+    NutchFileSystem nfs = new LocalFileSystem(conf);
+    RandomDatum[] data = generate(10000);
+    writeTest(nfs, data, FILE);
+    readTest(nfs, data, FILE, conf);
+  }
+
+  public void testEmptyFile() throws Exception {
+    Configuration conf = new Configuration();
+    NutchFileSystem nfs = new LocalFileSystem(conf);
+    writeTest(nfs, new RandomDatum[0], FILE);
+    ArrayFile.Reader reader = new ArrayFile.Reader(nfs, FILE, conf);
+    assertNull(reader.get(0, new RandomDatum()));
+    reader.close();
+  }
+
+  private static RandomDatum[] generate(int count) {
+    LOG.fine("generating " + count + " records in memory");
+    RandomDatum[] data = new RandomDatum[count];
+    RandomDatum.Generator generator = new RandomDatum.Generator();
+    for (int i = 0; i < count; i++) {
+      generator.next();
+      data[i] = generator.getValue();
+    }
+    return data;
+  }
+
+  private static void writeTest(NutchFileSystem nfs, RandomDatum[] data, String file)
+    throws IOException {
+    MapFile.delete(nfs, file);
+    LOG.fine("creating with " + data.length + " records");
+    ArrayFile.Writer writer = new ArrayFile.Writer(nfs, file, RandomDatum.class);
+    writer.setIndexInterval(100);
+    for (int i = 0; i < data.length; i++)
+      writer.append(data[i]);
+    writer.close();
+  }
+
+  private static void readTest(NutchFileSystem nfs, RandomDatum[] data, String file, Configuration conf)
+    throws IOException {
+    RandomDatum v = new RandomDatum();
+    LOG.fine("reading " + data.length + " records");
+    ArrayFile.Reader reader = new ArrayFile.Reader(nfs, file, conf);
+    for (int i = 0; i < data.length; i++) {       // try forwards
+      reader.get(i, v);
+      if (!v.equals(data[i])) {
+        throw new RuntimeException("wrong value at " + i);
+      }
+    }
+    for (int i = data.length-1; i >= 0; i--) {    // then backwards
+      reader.get(i, v);
+      if (!v.equals(data[i])) {
+        throw new RuntimeException("wrong value at " + i);
+      }
+    }
+    reader.close();
+    LOG.fine("done reading " + data.length + " records");
+  }
+
+
+  /** For debugging and testing. */
+  public static void main(String[] args) throws Exception {
+    int count = 1024 * 1024;
+    boolean create = true;
+    boolean check = true;
+    String file = FILE;
+    String usage = "Usage: TestArrayFile (-local | -dfs <namenode:port>) [-count N] [-nocreate] [-nocheck] file";
+      
+    if (args.length == 0) {
+      System.err.println(usage);
+      System.exit(-1);
+    }
+
+    Configuration conf = new Configuration();
+    int i = 0;
+    NutchFileSystem nfs = NutchFileSystem.parseArgs(args, i, conf);
+    try {
+        for (; i < args.length; i++) {       // parse command line
+            if (args[i] == null) {
+                continue;
+            } else if (args[i].equals("-count")) {
+                count = Integer.parseInt(args[++i]);
+            } else if (args[i].equals("-nocreate")) {
+                create = false;
+            } else if (args[i].equals("-nocheck")) {
+                check = false;
+            } else {                                       
+                // file is required parameter
+                file = args[i];
+            }
+        }
+
+        LOG.info("count = " + count);
+        LOG.info("create = " + create);
+        LOG.info("check = " + check);
+        LOG.info("file = " + file);
+
+        LOG.setLevel(Level.FINE);
+
+        RandomDatum[] data = generate(count);
+
+        if (create) {
+            writeTest(nfs, data, file);
+        }
+
+        if (check) {
+            readTest(nfs, data, file, conf);
+        }
+    } finally {
+        nfs.close();
+    }
+  }
+}

+ 70 - 0
src/test/org/apache/hadoop/io/TestMD5Hash.java

@@ -0,0 +1,70 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.
+
+import org.apache.hadoop.io.TestWritable;
+import junit.framework.TestCase;
+import java.security.MessageDigest;
+import java.util.Random;
+
+/** Unit tests for MD5Hash. */
+public class TestMD5Hash extends TestCase {
+  public TestMD5Hash(String name) { super(name); }
+
+  private static final Random RANDOM = new Random();
+
+  public static MD5Hash getTestHash() throws Exception {
+    MessageDigest digest = MessageDigest.getInstance("MD5");
+    byte[] buffer = new byte[1024];
+    RANDOM.nextBytes(buffer);
+    digest.update(buffer);
+    return new MD5Hash(digest.digest());
+  }
+
+  public void testMD5Hash() throws Exception {
+    MD5Hash md5Hash = getTestHash();
+
+    MD5Hash md5Hash00
+      = new MD5Hash(new byte[] {0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0});
+
+    MD5Hash md5HashFF
+      = new MD5Hash(new byte[] {-1,-1,-1,-1,-1,-1,-1,-1,
+                                -1,-1,-1,-1,-1,-1,-1,-1});
+
+    // test i/o
+    TestWritable.testWritable(md5Hash);
+    TestWritable.testWritable(md5Hash00);
+    TestWritable.testWritable(md5HashFF);
+
+    // test equals()
+    assertEquals(md5Hash, md5Hash);
+    assertEquals(md5Hash00, md5Hash00);
+    assertEquals(md5HashFF, md5HashFF);
+
+    // test compareTo()
+    assertTrue(md5Hash.compareTo(md5Hash) == 0);
+    assertTrue(md5Hash00.compareTo(md5Hash) < 0);
+    assertTrue(md5HashFF.compareTo(md5Hash) > 0);
+
+    // test toString and string ctor
+    assertEquals(md5Hash, new MD5Hash(md5Hash.toString()));
+    assertEquals(md5Hash00, new MD5Hash(md5Hash00.toString()));
+    assertEquals(md5HashFF, new MD5Hash(md5HashFF.toString()));
+
+  }
+	
+}

+ 285 - 0
src/test/org/apache/hadoop/io/TestSequenceFile.java

@@ -0,0 +1,285 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+import java.util.logging.*;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.*;
+
+
+/** Support for flat files of binary key/value pairs. */
+public class TestSequenceFile extends TestCase {
+  private static Logger LOG = SequenceFile.LOG;
+
+  private static Configuration conf = new Configuration();
+  
+  public TestSequenceFile(String name) { super(name); }
+
+  /** Unit tests for SequenceFile. */
+  public void testSequenceFile() throws Exception {
+    int count = 1024 * 10;
+    int megabytes = 1;
+    int factor = 5;
+    String file = System.getProperty("test.build.data",".") + "/test.seq";
+ 
+    int seed = new Random().nextInt();
+
+    NutchFileSystem nfs = new LocalFileSystem(new Configuration());
+    try {
+        //LOG.setLevel(Level.FINE);
+        writeTest(nfs, count, seed, file, false);
+        readTest(nfs, count, seed, file);
+
+        sortTest(nfs, count, megabytes, factor, false, file);
+        checkSort(nfs, count, seed, file);
+
+        sortTest(nfs, count, megabytes, factor, true, file);
+        checkSort(nfs, count, seed, file);
+
+        mergeTest(nfs, count, seed, file, false, factor, megabytes);
+        checkSort(nfs, count, seed, file);
+
+        mergeTest(nfs, count, seed, file, true, factor, megabytes);
+        checkSort(nfs, count, seed, file);
+    } finally {
+        nfs.close();
+    }
+  }
+
+  private static void writeTest(NutchFileSystem nfs, int count, int seed,
+                                String file, boolean compress)
+    throws IOException {
+    new File(file).delete();
+    LOG.fine("creating with " + count + " records");
+    SequenceFile.Writer writer =
+      new SequenceFile.Writer(nfs, file, RandomDatum.class, RandomDatum.class,
+                              compress);
+    RandomDatum.Generator generator = new RandomDatum.Generator(seed);
+    for (int i = 0; i < count; i++) {
+      generator.next();
+      RandomDatum key = generator.getKey();
+      RandomDatum value = generator.getValue();
+
+      writer.append(key, value);
+    }
+    writer.close();
+  }
+
+  private static void readTest(NutchFileSystem nfs, int count, int seed, String file)
+    throws IOException {
+    RandomDatum k = new RandomDatum();
+    RandomDatum v = new RandomDatum();
+    LOG.fine("reading " + count + " records");
+    SequenceFile.Reader reader = new SequenceFile.Reader(nfs, file, conf);
+    RandomDatum.Generator generator = new RandomDatum.Generator(seed);
+    for (int i = 0; i < count; i++) {
+      generator.next();
+      RandomDatum key = generator.getKey();
+      RandomDatum value = generator.getValue();
+      
+      reader.next(k, v);
+      
+      if (!k.equals(key))
+        throw new RuntimeException("wrong key at " + i);
+      if (!v.equals(value))
+        throw new RuntimeException("wrong value at " + i);
+    }
+    reader.close();
+  }
+
+
+  private static void sortTest(NutchFileSystem nfs, int count, int megabytes, 
+                               int factor, boolean fast, String file)
+    throws IOException {
+    new File(file+".sorted").delete();
+    SequenceFile.Sorter sorter = newSorter(nfs, fast, megabytes, factor);
+    LOG.fine("sorting " + count + " records");
+    sorter.sort(file, file+".sorted");
+    LOG.fine("done sorting " + count + " records");
+  }
+
+  private static void checkSort(NutchFileSystem nfs, int count, int seed, String file)
+    throws IOException {
+    LOG.fine("sorting " + count + " records in memory for check");
+    RandomDatum.Generator generator = new RandomDatum.Generator(seed);
+    SortedMap map = new TreeMap();
+    for (int i = 0; i < count; i++) {
+      generator.next();
+      RandomDatum key = generator.getKey();
+      RandomDatum value = generator.getValue();
+      map.put(key, value);
+    }
+
+    LOG.fine("checking order of " + count + " records");
+    RandomDatum k = new RandomDatum();
+    RandomDatum v = new RandomDatum();
+    Iterator iterator = map.entrySet().iterator();
+    SequenceFile.Reader reader = new SequenceFile.Reader(nfs, file + ".sorted", conf);
+    for (int i = 0; i < count; i++) {
+      Map.Entry entry = (Map.Entry)iterator.next();
+      RandomDatum key = (RandomDatum)entry.getKey();
+      RandomDatum value = (RandomDatum)entry.getValue();
+
+      reader.next(k, v);
+
+      if (!k.equals(key))
+        throw new RuntimeException("wrong key at " + i);
+      if (!v.equals(value))
+        throw new RuntimeException("wrong value at " + i);
+    }
+
+    reader.close();
+    LOG.fine("sucessfully checked " + count + " records");
+  }
+
+  private static void mergeTest(NutchFileSystem nfs, int count, int seed, 
+                                String file, boolean fast, int factor, 
+                                int megabytes)
+    throws IOException {
+
+    LOG.fine("creating "+factor+" files with "+count/factor+" records");
+
+    SequenceFile.Writer[] writers = new SequenceFile.Writer[factor];
+    String[] names = new String[factor];
+    String[] sortedNames = new String[factor];
+    
+    for (int i = 0; i < factor; i++) {
+      names[i] = file+"."+i;
+      sortedNames[i] = names[i] + ".sorted";
+      nfs.delete(new File(names[i]));
+      nfs.delete(new File(sortedNames[i]));
+      writers[i] =
+        new SequenceFile.Writer(nfs, names[i], RandomDatum.class,RandomDatum.class);
+    }
+
+    RandomDatum.Generator generator = new RandomDatum.Generator(seed);
+
+    for (int i = 0; i < count; i++) {
+      generator.next();
+      RandomDatum key = generator.getKey();
+      RandomDatum value = generator.getValue();
+
+      writers[i%factor].append(key, value);
+    }
+
+    for (int i = 0; i < factor; i++)
+      writers[i].close();
+
+    for (int i = 0; i < factor; i++) {
+      LOG.fine("sorting file " + i + " with " + count/factor + " records");
+      newSorter(nfs, fast, megabytes, factor).sort(names[i], sortedNames[i]);
+    }
+
+    LOG.fine("merging " + factor + " files with " + count/factor + " records");
+    nfs.delete(new File(file+".sorted"));
+    newSorter(nfs, fast, megabytes, factor).merge(sortedNames, file+".sorted");
+  }
+
+  private static SequenceFile.Sorter newSorter(NutchFileSystem nfs, 
+                                               boolean fast,
+                                               int megabytes, int factor) {
+    SequenceFile.Sorter sorter = 
+      fast
+      ? new SequenceFile.Sorter(nfs, new RandomDatum.Comparator(),RandomDatum.class, conf)
+      : new SequenceFile.Sorter(nfs, RandomDatum.class, RandomDatum.class, conf);
+    sorter.setMemory(megabytes * 1024*1024);
+    sorter.setFactor(factor);
+    return sorter;
+  }
+
+
+  /** For debugging and testing. */
+  public static void main(String[] args) throws Exception {
+    int count = 1024 * 1024;
+    int megabytes = 1;
+    int factor = 10;
+    boolean create = true;
+    boolean check = false;
+    boolean fast = false;
+    boolean merge = false;
+    boolean compress = false;
+    String file = null;
+    String usage = "Usage: SequenceFile (-local | -dfs <namenode:port>) [-count N] [-megabytes M] [-factor F] [-nocreate] [-check] [-fast] [-merge] [-compress] file";
+    
+    if (args.length == 0) {
+        System.err.println(usage);
+        System.exit(-1);
+    }
+    int i = 0;
+    NutchFileSystem nfs = NutchFileSystem.parseArgs(args, i, conf);      
+    try {
+      for (; i < args.length; i++) {       // parse command line
+          if (args[i] == null) {
+              continue;
+          } else if (args[i].equals("-count")) {
+              count = Integer.parseInt(args[++i]);
+          } else if (args[i].equals("-megabytes")) {
+              megabytes = Integer.parseInt(args[++i]);
+          } else if (args[i].equals("-factor")) {
+              factor = Integer.parseInt(args[++i]);
+          } else if (args[i].equals("-nocreate")) {
+              create = false;
+          } else if (args[i].equals("-check")) {
+              check = true;
+          } else if (args[i].equals("-fast")) {
+              fast = true;
+          } else if (args[i].equals("-merge")) {
+              merge = true;
+          } else if (args[i].equals("-compress")) {
+              compress = true;
+          } else {
+              // file is required parameter
+              file = args[i];
+          }
+        }
+        LOG.info("count = " + count);
+        LOG.info("megabytes = " + megabytes);
+        LOG.info("factor = " + factor);
+        LOG.info("create = " + create);
+        LOG.info("check = " + check);
+        LOG.info("fast = " + fast);
+        LOG.info("merge = " + merge);
+        LOG.info("compress = " + compress);
+        LOG.info("file = " + file);
+
+        int seed = 0;
+ 
+        LOG.setLevel(Level.FINE);
+
+        if (create && !merge) {
+            writeTest(nfs, count, seed, file, compress);
+            readTest(nfs, count, seed, file);
+        }
+
+        if (merge) {
+            mergeTest(nfs, count, seed, file, fast, factor, megabytes);
+        } else {
+            sortTest(nfs, count, megabytes, factor, fast, file);
+        }
+    
+        if (check) {
+            checkSort(nfs, count, seed, file);
+        }
+      } finally {
+          nfs.close();
+      }
+  }
+}

+ 136 - 0
src/test/org/apache/hadoop/io/TestSetFile.java

@@ -0,0 +1,136 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+import java.util.logging.*;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.LogFormatter;
+
+/** Support for flat files of binary key/value pairs. */
+public class TestSetFile extends TestCase {
+  private static Logger LOG = SequenceFile.LOG;
+  private static String FILE =
+    System.getProperty("test.build.data",".") + "/test.set";
+
+  private static Configuration conf = new Configuration();
+  
+  public TestSetFile(String name) { super(name); }
+
+  public void testSetFile() throws Exception {
+    NutchFileSystem nfs = new LocalFileSystem(conf);
+    try {
+        RandomDatum[] data = generate(10000);
+        writeTest(nfs, data, FILE);
+        readTest(nfs, data, FILE);
+    } finally {
+        nfs.close();
+    }
+  }
+
+  private static RandomDatum[] generate(int count) {
+    LOG.fine("generating " + count + " records in memory");
+    RandomDatum[] data = new RandomDatum[count];
+    RandomDatum.Generator generator = new RandomDatum.Generator();
+    for (int i = 0; i < count; i++) {
+      generator.next();
+      data[i] = generator.getValue();
+    }
+    LOG.fine("sorting " + count + " records in memory");
+    Arrays.sort(data);
+    return data;
+  }
+
+  private static void writeTest(NutchFileSystem nfs, RandomDatum[] data, String file)
+    throws IOException {
+    MapFile.delete(nfs, file);
+    LOG.fine("creating with " + data.length + " records");
+    SetFile.Writer writer = new SetFile.Writer(nfs, file, RandomDatum.class);
+    for (int i = 0; i < data.length; i++)
+      writer.append(data[i]);
+    writer.close();
+  }
+
+  private static void readTest(NutchFileSystem nfs, RandomDatum[] data, String file)
+    throws IOException {
+    RandomDatum v = new RandomDatum();
+    LOG.fine("reading " + data.length + " records");
+    SetFile.Reader reader = new SetFile.Reader(nfs, file, conf);
+    for (int i = 0; i < data.length; i++) {
+      if (!reader.seek(data[i]))
+        throw new RuntimeException("wrong value at " + i);
+    }
+    reader.close();
+    LOG.fine("done reading " + data.length + " records");
+  }
+
+
+  /** For debugging and testing. */
+  public static void main(String[] args) throws Exception {
+    int count = 1024 * 1024;
+    boolean create = true;
+    boolean check = true;
+    String file = FILE;
+    String usage = "Usage: TestSetFile (-local | -dfs <namenode:port>) [-count N] [-nocreate] [-nocheck] file";
+      
+    if (args.length == 0) {
+      System.err.println(usage);
+      System.exit(-1);
+    }
+      
+    int i = 0;
+    NutchFileSystem nfs = NutchFileSystem.parseArgs(args, i, conf);      
+    try {
+      for (; i < args.length; i++) {       // parse command line
+        if (args[i] == null) {
+          continue;
+        } else if (args[i].equals("-count")) {
+          count = Integer.parseInt(args[++i]);
+        } else if (args[i].equals("-nocreate")) {
+          create = false;
+        } else if (args[i].equals("-nocheck")) {
+          check = false;
+        } else {
+          // file is required parameter
+          file = args[i];
+        }
+
+        LOG.info("count = " + count);
+        LOG.info("create = " + create);
+        LOG.info("check = " + check);
+        LOG.info("file = " + file);
+
+        LOG.setLevel(Level.FINE);
+
+        RandomDatum[] data = generate(count);
+
+        if (create) {
+          writeTest(nfs, data, file);
+        }
+
+        if (check) {
+          readTest(nfs, data, file);
+        }
+      }
+    } finally {
+      nfs.close();
+    }
+  }
+}

+ 85 - 0
src/test/org/apache/hadoop/io/TestUTF8.java

@@ -0,0 +1,85 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.
+
+import junit.framework.TestCase;
+import java.util.Random;
+import java.util.Arrays;
+
+/** Unit tests for UTF8. */
+public class TestUTF8 extends TestCase {
+  public TestUTF8(String name) { super(name); }
+
+  private static final Random RANDOM = new Random();
+
+  public static String getTestString() throws Exception {
+    StringBuffer buffer = new StringBuffer();
+    int length = RANDOM.nextInt(100);
+    for (int i = 0; i < length; i++) {
+      buffer.append((char)(RANDOM.nextInt(Character.MAX_VALUE)));
+    }
+    return buffer.toString();
+  }
+
+  public void testWritable() throws Exception {
+    for (int i = 0; i < 10; i++) {
+      TestWritable.testWritable(new UTF8(getTestString()));
+    }
+  }
+
+  public void testGetBytes() throws Exception {
+    for (int i = 0; i < 10; i++) {
+
+      // generate a random string
+      String before = getTestString();
+
+      // check its utf8
+      assertEquals(before, new String(UTF8.getBytes(before), "UTF-8"));
+    }
+  }
+
+  public void testIO() throws Exception {
+    DataOutputBuffer out = new DataOutputBuffer();
+    DataInputBuffer in = new DataInputBuffer();
+
+    for (int i = 0; i < 10; i++) {
+      // generate a random string
+      String before = getTestString();
+
+      // write it
+      out.reset();
+      UTF8.writeString(out, before);
+
+      // test that it reads correctly
+      in.reset(out.getData(), out.getLength());
+      String after = UTF8.readString(in);
+      assertTrue(before.equals(after));
+
+      // test that it reads correctly with DataInput
+      in.reset(out.getData(), out.getLength());
+      String after2 = in.readUTF();
+      assertTrue(before.equals(after2));
+
+      // test that it is compatible with Java's other decoder
+      String after3 = new String(out.getData(), 2, out.getLength()-2, "UTF-8");
+      assertTrue(before.equals(after3));
+
+    }
+
+  }
+	
+}

+ 177 - 0
src/test/org/apache/hadoop/io/TestVersionedWritable.java

@@ -0,0 +1,177 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.
+
+import java.io.*;
+import java.util.Random;
+import junit.framework.TestCase;
+
+/** Unit tests for VersionedWritable. */
+
+public class TestVersionedWritable extends TestCase {
+
+	public TestVersionedWritable(String name) { super(name); }
+	
+	
+	/** Example class used in test cases below. */
+	public static class SimpleVersionedWritable extends VersionedWritable {
+		
+		private static final Random RANDOM = new Random();
+		int state = RANDOM.nextInt();
+
+		
+		private static byte VERSION = 1;
+		public byte getVersion() { 
+			return VERSION; 
+		}		
+		
+
+		public void write(DataOutput out) throws IOException {
+			super.write(out); // version.
+			out.writeInt(state);
+		}
+		
+		public void readFields(DataInput in) throws IOException {
+			super.readFields(in); // version
+			this.state = in.readInt();
+		}
+		
+
+		public static SimpleVersionedWritable read(DataInput in) throws IOException {
+			SimpleVersionedWritable result = new SimpleVersionedWritable();
+			result.readFields(in);
+			return result;
+		}
+		
+
+		/** Required by test code, below. */
+		public boolean equals(Object o) {
+			if (!(o instanceof SimpleVersionedWritable))
+				return false;
+			SimpleVersionedWritable other = (SimpleVersionedWritable)o;
+			return this.state == other.state;
+		}
+
+	}
+
+
+	
+	public static class AdvancedVersionedWritable extends SimpleVersionedWritable {
+
+		String shortTestString = "Now is the time for all good men to come to the aid of the Party";
+		String longTestString = "Four score and twenty years ago. Blah. Blah. Blah. Blah. Blah. Blah. Blah. Blah.";
+		
+		String compressableTestString = 
+			"Blah. Blah. Blah. Blah. Blah. Blah. Blah. Blah. Blah. Blah. Blah. Blah. " +
+			"Blah. Blah. Blah. Blah. Blah. Blah. Blah. Blah. Blah. Blah. Blah. Blah. " +
+			"Blah. Blah. Blah. Blah. Blah. Blah. Blah. Blah. Blah. Blah. Blah. Blah. " ;
+
+		SimpleVersionedWritable containedObject = new SimpleVersionedWritable();
+		String[] testStringArray = {"The", "Quick", "Brown", "Fox", "Jumped", "Over", "The", "Lazy", "Dog"};
+
+		public void write(DataOutput out) throws IOException {
+			super.write(out);
+			out.writeUTF(shortTestString); 
+			WritableUtils.writeString(out,longTestString); 
+			int comp = WritableUtils.writeCompressedString(out,compressableTestString); 
+			System.out.println("Compression is " + comp + "%");
+			containedObject.write(out); // Warning if this is a recursive call, you need a null value.
+			WritableUtils.writeStringArray(out,testStringArray); 
+
+		}
+		
+		
+		public void readFields(DataInput in) throws IOException {
+			super.readFields(in);
+			shortTestString = in.readUTF();
+			longTestString = WritableUtils.readString(in); 
+			compressableTestString = WritableUtils.readCompressedString(in);
+			containedObject.readFields(in); // Warning if this is a recursive call, you need a null value.
+			testStringArray = WritableUtils.readStringArray(in); 
+		}
+			
+
+
+		public boolean equals(Object o) {
+			super.equals(o);
+
+			if (!shortTestString.equals(((AdvancedVersionedWritable)o).shortTestString)) { return false;}
+			if (!longTestString.equals(((AdvancedVersionedWritable)o).longTestString)) { return false;}
+			if (!compressableTestString.equals(((AdvancedVersionedWritable)o).compressableTestString)) { return false;}
+			
+			if (testStringArray.length != ((AdvancedVersionedWritable)o).testStringArray.length) { return false;}
+			for(int i=0;i< testStringArray.length;i++){
+				if (!testStringArray[i].equals(((AdvancedVersionedWritable)o).testStringArray[i])) {
+					return false;
+				}
+			}
+			
+			if (!containedObject.equals(((AdvancedVersionedWritable)o).containedObject)) { return false;}
+			
+			return true;
+		}
+		
+
+
+	}
+
+	/* This one checks that version mismatch is thrown... */
+	public static class SimpleVersionedWritableV2 extends SimpleVersionedWritable {
+		static byte VERSION = 2;
+		public byte getVersion() { 
+			return VERSION; 
+		}		
+	}
+
+
+	/** Test 1: Check that SimpleVersionedWritable. */
+	public void testSimpleVersionedWritable() throws Exception {
+		TestWritable.testWritable(new SimpleVersionedWritable());
+	}
+
+	/** Test 2: Check that AdvancedVersionedWritable Works (well, why wouldn't it!). */
+	public void testAdvancedVersionedWritable() throws Exception {
+		TestWritable.testWritable(new AdvancedVersionedWritable());
+	}
+
+	/** Test 3: Check that SimpleVersionedWritable throws an Exception. */
+	public void testSimpleVersionedWritableMismatch() throws Exception {
+		TestVersionedWritable.testVersionedWritable(new SimpleVersionedWritable(), new SimpleVersionedWritableV2());
+	}
+
+
+
+	
+  /** Utility method for testing VersionedWritables. */
+  public static void testVersionedWritable(Writable before, Writable after) throws Exception {
+      DataOutputBuffer dob = new DataOutputBuffer();
+      before.write(dob);
+	
+      DataInputBuffer dib = new DataInputBuffer();
+      dib.reset(dob.getData(), dob.getLength());
+
+      try {
+          after.readFields(dib);
+      } catch (VersionMismatchException vmme) {
+          System.out.println("Good, we expected this:" + vmme);
+          return;
+      }
+	
+      throw new Exception("A Version Mismatch Didn't Happen!");
+  }
+}
+

+ 82 - 0
src/test/org/apache/hadoop/io/TestWritable.java

@@ -0,0 +1,82 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.
+
+import java.io.*;
+import java.util.Random;
+import junit.framework.TestCase;
+import org.apache.hadoop.io.*;
+import org.apache.nutch.parse.ParseData;
+import org.apache.hadoop.conf.Configuration;
+
+/** Unit tests for Writable. */
+public class TestWritable extends TestCase {
+  public TestWritable(String name) { super(name); }
+
+  /** Example class used in test cases below. */
+  public static class SimpleWritable implements Writable {
+    private static final Random RANDOM = new Random();
+
+    int state = RANDOM.nextInt();
+
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(state);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      this.state = in.readInt();
+    }
+
+    public static SimpleWritable read(DataInput in) throws IOException {
+      SimpleWritable result = new SimpleWritable();
+      result.readFields(in);
+      return result;
+    }
+
+    /** Required by test code, below. */
+    public boolean equals(Object o) {
+      if (!(o instanceof SimpleWritable))
+        return false;
+      SimpleWritable other = (SimpleWritable)o;
+      return this.state == other.state;
+    }
+  }
+
+  /** Test 1: Check that SimpleWritable. */
+  public void testSimpleWritable() throws Exception {
+    testWritable(new SimpleWritable());
+  }
+
+  /** Utility method for testing writables. */
+  public static void testWritable(Writable before) throws Exception {
+      DataOutputBuffer dob = new DataOutputBuffer();
+      before.write(dob);
+
+      DataInputBuffer dib = new DataInputBuffer();
+      dib.reset(dob.getData(), dob.getLength());
+    
+      Writable after = (Writable)before.getClass().newInstance();
+      if(after instanceof ParseData) {
+        ParseData parseData = (ParseData) after;
+        parseData.setConf(new Configuration());
+      }
+      after.readFields(dib);
+
+      assertEquals(before, after);
+  }
+	
+}

+ 224 - 0
src/test/org/apache/hadoop/ipc/TestIPC.java

@@ -0,0 +1,224 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc.
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.util.Random;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import junit.framework.TestCase;
+
+import java.util.logging.Logger;
+import java.util.logging.Level;
+
+import org.apache.hadoop.util.LogFormatter;
+import org.apache.hadoop.conf.Configuration;
+
+/** Unit tests for IPC. */
+public class TestIPC extends TestCase {
+  public static final Logger LOG =
+    LogFormatter.getLogger("org.apache.hadoop.ipc.TestIPC");
+
+  private static Configuration conf = new Configuration();
+  
+  // quiet during testing, since output ends up on console
+  static {
+    LOG.setLevel(Level.WARNING);
+    Client.LOG.setLevel(Level.WARNING);
+    Server.LOG.setLevel(Level.WARNING);
+  }
+
+  public TestIPC(String name) { super(name); }
+
+  private static final Random RANDOM = new Random();
+
+  private static final int PORT = 1234;
+
+  private static class TestServer extends Server {
+    private boolean sleep;
+
+    public TestServer(int port, int handlerCount, boolean sleep) {
+      super(port, LongWritable.class, handlerCount, conf);
+      this.setTimeout(1000);
+      this.sleep = sleep;
+    }
+
+    public Writable call(Writable param) throws IOException {
+      if (sleep) {
+        try {
+          Thread.sleep(RANDOM.nextInt(200));      // sleep a bit
+        } catch (InterruptedException e) {}
+      }
+      return param;                               // echo param as result
+    }
+  }
+
+  private static class SerialCaller extends Thread {
+    private Client client;
+    private int count;
+    private boolean failed;
+
+    public SerialCaller(Client client, int count) {
+      this.client = client;
+      this.count = count;
+      client.setTimeout(1000);
+    }
+
+    public void run() {
+      for (int i = 0; i < count; i++) {
+        try {
+          LongWritable param = new LongWritable(RANDOM.nextLong());
+          LongWritable value =
+            (LongWritable)client.call(param, new InetSocketAddress(PORT));
+          if (!param.equals(value)) {
+            LOG.severe("Call failed!");
+            failed = true;
+            break;
+          }
+        } catch (Exception e) {
+          LOG.severe("Caught: " + e);
+          failed = true;
+        }
+      }
+    }
+  }
+
+  private static class ParallelCaller extends Thread {
+    private Client client;
+    private int count;
+    private InetSocketAddress[] addresses;
+    private boolean failed;
+    
+    public ParallelCaller(Client client, InetSocketAddress[] addresses,
+                          int count) {
+      this.client = client;
+      this.addresses = addresses;
+      this.count = count;
+      client.setTimeout(1000);
+    }
+
+    public void run() {
+      for (int i = 0; i < count; i++) {
+        try {
+          Writable[] params = new Writable[addresses.length];
+          for (int j = 0; j < addresses.length; j++)
+            params[j] = new LongWritable(RANDOM.nextLong());
+          Writable[] values = client.call(params, addresses);
+          for (int j = 0; j < addresses.length; j++) {
+            if (!params[j].equals(values[j])) {
+              LOG.severe("Call failed!");
+              failed = true;
+              break;
+            }
+          }
+        } catch (Exception e) {
+          LOG.severe("Caught: " + e);
+          failed = true;
+        }
+      }
+    }
+  }
+
+  public void testSerial() throws Exception {
+    testSerial(3, false, 2, 5, 100);
+  }
+
+  public void testSerial(int handlerCount, boolean handlerSleep, 
+                          int clientCount, int callerCount, int callCount)
+    throws Exception {
+    Server server = new TestServer(PORT, handlerCount, handlerSleep);
+    server.start();
+
+    Client[] clients = new Client[clientCount];
+    for (int i = 0; i < clientCount; i++) {
+      clients[i] = new Client(LongWritable.class, conf);
+    }
+    
+    SerialCaller[] callers = new SerialCaller[callerCount];
+    for (int i = 0; i < callerCount; i++) {
+      callers[i] = new SerialCaller(clients[i%clientCount], callCount);
+      callers[i].start();
+    }
+    for (int i = 0; i < callerCount; i++) {
+      callers[i].join();
+      assertFalse(callers[i].failed);
+    }
+    for (int i = 0; i < clientCount; i++) {
+      clients[i].stop();
+    }
+    server.stop();
+  }
+	
+  public void testParallel() throws Exception {
+    testParallel(10, false, 2, 4, 2, 4, 100);
+  }
+
+  public void testParallel(int handlerCount, boolean handlerSleep,
+                           int serverCount, int addressCount,
+                           int clientCount, int callerCount, int callCount)
+    throws Exception {
+    Server[] servers = new Server[serverCount];
+    for (int i = 0; i < serverCount; i++) {
+      servers[i] = new TestServer(PORT+i, handlerCount, handlerSleep);
+      servers[i].start();
+    }
+
+    InetSocketAddress[] addresses = new InetSocketAddress[addressCount];
+    for (int i = 0; i < addressCount; i++) {
+      addresses[i] = new InetSocketAddress(PORT+(i%serverCount));
+    }
+
+    Client[] clients = new Client[clientCount];
+    for (int i = 0; i < clientCount; i++) {
+      clients[i] = new Client(LongWritable.class, conf);
+    }
+    
+    ParallelCaller[] callers = new ParallelCaller[callerCount];
+    for (int i = 0; i < callerCount; i++) {
+      callers[i] =
+        new ParallelCaller(clients[i%clientCount], addresses, callCount);
+      callers[i].start();
+    }
+    for (int i = 0; i < callerCount; i++) {
+      callers[i].join();
+      assertFalse(callers[i].failed);
+    }
+    for (int i = 0; i < clientCount; i++) {
+      clients[i].stop();
+    }
+    for (int i = 0; i < serverCount; i++) {
+      servers[i].stop();
+    }
+  }
+	
+  public static void main(String[] args) throws Exception {
+    // crank up the volume!
+    LOG.setLevel(Level.FINE);
+    Client.LOG.setLevel(Level.FINE);
+    Server.LOG.setLevel(Level.FINE);
+    LogFormatter.setShowThreadIDs(true);
+
+    //new TestIPC("test").testSerial(5, false, 2, 10, 1000);
+
+    new TestIPC("test").testParallel(10, false, 2, 4, 2, 4, 1000);
+
+  }
+
+}

+ 141 - 0
src/test/org/apache/hadoop/ipc/TestRPC.java

@@ -0,0 +1,141 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc.
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.lang.reflect.Method;
+
+import junit.framework.TestCase;
+
+import java.util.logging.Logger;
+import java.util.logging.Level;
+import java.util.Arrays;
+
+import org.apache.hadoop.util.LogFormatter;
+import org.apache.hadoop.conf.Configuration;
+
+/** Unit tests for RPC. */
+public class TestRPC extends TestCase {
+  private static final int PORT = 1234;
+
+  public static final Logger LOG =
+    LogFormatter.getLogger("org.apache.hadoop.ipc.TestRPC");
+  
+  private static Configuration conf = new Configuration();
+
+  // quiet during testing, since output ends up on console
+  static {
+    LOG.setLevel(Level.WARNING);
+    Client.LOG.setLevel(Level.WARNING);
+    Server.LOG.setLevel(Level.WARNING);
+  }
+
+  public TestRPC(String name) { super(name); }
+	
+  public interface TestProtocol {
+    void ping() throws IOException;
+    String echo(String value) throws IOException;
+    String[] echo(String[] value) throws IOException;
+    int add(int v1, int v2) throws IOException;
+    int add(int[] values) throws IOException;
+    int error() throws IOException;
+  }
+
+  public class TestImpl implements TestProtocol {
+
+    public void ping() {}
+
+    public String echo(String value) throws IOException { return value; }
+
+    public String[] echo(String[] values) throws IOException { return values; }
+
+    public int add(int v1, int v2) {
+      return v1 + v2;
+    }
+
+    public int add(int[] values) {
+      int sum = 0;
+      for (int i = 0; i < values.length; i++) {
+        sum += values[i];
+      }
+      return sum;
+    }
+
+    public int error() throws IOException {
+      throw new IOException("bobo");
+    }
+
+  }
+
+  public void testCalls() throws Exception {
+    Server server = RPC.getServer(new TestImpl(), PORT, conf);
+    server.start();
+
+    InetSocketAddress addr = new InetSocketAddress(PORT);
+    TestProtocol proxy =
+      (TestProtocol)RPC.getProxy(TestProtocol.class, addr, conf);
+    
+    proxy.ping();
+
+    String stringResult = proxy.echo("foo");
+    assertEquals(stringResult, "foo");
+
+    String[] stringResults = proxy.echo(new String[]{"foo","bar"});
+    assertTrue(Arrays.equals(stringResults, new String[]{"foo","bar"}));
+
+    int intResult = proxy.add(1, 2);
+    assertEquals(intResult, 3);
+
+    intResult = proxy.add(new int[] {1, 2});
+    assertEquals(intResult, 3);
+
+    boolean caught = false;
+    try {
+      proxy.error();
+    } catch (IOException e) {
+      LOG.fine("Caught " + e);
+      caught = true;
+    }
+    assertTrue(caught);
+
+    // try some multi-calls
+    Method echo =
+      TestProtocol.class.getMethod("echo", new Class[] { String.class });
+    String[] strings = (String[])RPC.call(echo, new String[][]{{"a"},{"b"}},
+                                         new InetSocketAddress[] {addr, addr}, conf);
+    assertTrue(Arrays.equals(strings, new String[]{"a","b"}));
+
+    Method ping = TestProtocol.class.getMethod("ping", new Class[] {});
+    Object[] voids = (Object[])RPC.call(ping, new Object[][]{{},{}},
+                                        new InetSocketAddress[] {addr, addr}, conf);
+    assertEquals(voids, null);
+
+    server.stop();
+  }
+  public static void main(String[] args) throws Exception {
+    // crank up the volume!
+    LOG.setLevel(Level.FINE);
+    Client.LOG.setLevel(Level.FINE);
+    Server.LOG.setLevel(Level.FINE);
+    LogFormatter.setShowThreadIDs(true);
+
+    new TestRPC("test").testCalls();
+
+  }
+
+}

+ 317 - 0
src/test/org/apache/hadoop/mapred/MapredLoadTest.java

@@ -0,0 +1,317 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.lib.*;
+
+import java.io.*;
+import java.util.*;
+import java.math.*;
+
+/**********************************************************
+ * MapredLoadTest generates a bunch of work that exercises
+ * a Nutch Map-Reduce system (and DFS, too).  It goes through
+ * the following steps:
+ *
+ * 1) Take inputs 'range' and 'counts'.
+ * 2) Generate 'counts' random integers between 0 and range-1.
+ * 3) Create a file that lists each integer between 0 and range-1,
+ *    and lists the number of times that integer was generated.
+ * 4) Emit a (very large) file that contains all the integers
+ *    in the order generated.
+ * 5) After the file has been generated, read it back and count
+ *    how many times each int was generated.
+ * 6) Compare this big count-map against the original one.  If
+ *    they match, then SUCCESS!  Otherwise, FAILURE!
+ *
+ * OK, that's how we can think about it.  What are the map-reduce
+ * steps that get the job done?
+ *
+ * 1) In a non-mapred thread, take the inputs 'range' and 'counts'.
+ * 2) In a non-mapread thread, generate the answer-key and write to disk.
+ * 3) In a mapred job, divide the answer key into K jobs.
+ * 4) A mapred 'generator' task consists of K map jobs.  Each reads
+ *    an individual "sub-key", and generates integers according to
+ *    to it (though with a random ordering).
+ * 5) The generator's reduce task agglomerates all of those files
+ *    into a single one.
+ * 6) A mapred 'reader' task consists of M map jobs.  The output
+ *    file is cut into M pieces. Each of the M jobs counts the 
+ *    individual ints in its chunk and creates a map of all seen ints.
+ * 7) A mapred job integrates all the count files into a single one.
+ *
+ **********************************************************/
+public class MapredLoadTest {
+    static class RandomGenMapper implements Mapper {
+        Random r = new Random();
+        public void configure(JobConf job) {
+        }
+
+        public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
+            int randomVal = ((IntWritable) key).get();
+            int randomCount = ((IntWritable) val).get();
+
+            for (int i = 0; i < randomCount; i++) {
+                out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
+            }
+        }
+    }
+    static class RandomGenReducer implements Reducer {
+        public void configure(JobConf job) {
+        }
+
+        public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
+            int keyint = ((IntWritable) key).get();
+            while (it.hasNext()) {
+                int val = ((IntWritable) it.next()).get();
+                out.collect(new UTF8("" + val), new UTF8(""));
+            }
+        }
+    }
+    static class RandomCheckMapper implements Mapper {
+        public void configure(JobConf job) {
+        }
+
+        public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
+            long pos = ((LongWritable) key).get();
+            UTF8 str = (UTF8) val;
+
+            out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1));
+        }
+    }
+    static class RandomCheckReducer implements Reducer {
+        public void configure(JobConf job) {
+        }
+        
+        public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
+            int keyint = ((IntWritable) key).get();
+            int count = 0;
+            while (it.hasNext()) {
+                it.next();
+                count++;
+            }
+            out.collect(new IntWritable(keyint), new IntWritable(count));
+        }
+    }
+
+    int range;
+    int counts;
+    Random r = new Random();
+    Configuration conf;
+
+    /**
+     * MapredLoadTest
+     */
+    public MapredLoadTest(int range, int counts, Configuration conf) throws IOException {
+        this.range = range;
+        this.counts = counts;
+        this.conf = conf;
+    }
+
+    /**
+     * 
+     */
+    public void launch() throws IOException {
+        //
+        // Generate distribution of ints.  This is the answer key.
+        //
+        int countsToGo = counts;
+        int dist[] = new int[range];
+        for (int i = 0; i < range; i++) {
+            double avgInts = (1.0 * countsToGo) / (range - i);
+            dist[i] = (int) Math.max(0, Math.round(avgInts + (Math.sqrt(avgInts) * r.nextGaussian())));
+            countsToGo -= dist[i];
+        }
+        if (countsToGo > 0) {
+            dist[dist.length-1] += countsToGo;
+        }
+
+        //
+        // Write the answer key to a file.  
+        //
+        NutchFileSystem fs = NutchFileSystem.get(conf);
+        File testdir = new File("mapred.loadtest");
+        fs.mkdirs(testdir);
+
+        File randomIns = new File(testdir, "genins");
+        fs.mkdirs(randomIns);
+
+        File answerkey = new File(randomIns, "answer.key");
+        SequenceFile.Writer out = new SequenceFile.Writer(fs, answerkey.getPath(), IntWritable.class, IntWritable.class);
+        try {
+            for (int i = 0; i < range; i++) {
+                out.append(new IntWritable(i), new IntWritable(dist[i]));
+            }
+        } finally {
+            out.close();
+        }
+
+        //
+        // Now we need to generate the random numbers according to
+        // the above distribution.
+        //
+        // We create a lot of map tasks, each of which takes at least
+        // one "line" of the distribution.  (That is, a certain number
+        // X is to be generated Y number of times.)
+        //
+        // A map task emits Y key/val pairs.  The val is X.  The key
+        // is a randomly-generated number.
+        //
+        // The reduce task gets its input sorted by key.  That is, sorted
+        // in random order.  It then emits a single line of text that
+        // for the given values.  It does not emit the key.
+        //
+        // Because there's just one reduce task, we emit a single big
+        // file of random numbers.
+        //
+        File randomOuts = new File(testdir, "genouts");
+        fs.mkdirs(randomOuts);
+
+
+        JobConf genJob = new JobConf(conf);
+        genJob.setInputDir(randomIns);
+        genJob.setInputKeyClass(IntWritable.class);
+        genJob.setInputValueClass(IntWritable.class);
+        genJob.setInputFormat(SequenceFileInputFormat.class);
+        genJob.setMapperClass(RandomGenMapper.class);
+
+        genJob.setOutputDir(randomOuts);
+        genJob.setOutputKeyClass(IntWritable.class);
+        genJob.setOutputValueClass(IntWritable.class);
+        genJob.setOutputFormat(TextOutputFormat.class);
+        genJob.setReducerClass(RandomGenReducer.class);
+        genJob.setNumReduceTasks(1);
+
+        JobClient.runJob(genJob);
+
+        //
+        // Next, we read the big file in and regenerate the 
+        // original map.
+        //
+        // We have many map tasks, each of which read at least one
+        // of the output numbers.  For each number read in, the
+        // map task emits a key/value pair where the key is the
+        // number and the value is "1".
+        //
+        // We have a single reduce task, which receives its input
+        // sorted by the key emitted above.  For each key, there will
+        // be a certain number of "1" values.  The reduce task sums
+        // these values to compute how many times the given key was
+        // emitted.
+        //
+        // The reduce task then emits a key/val pair where the key
+        // is the number in question, and the value is the number of
+        // times the key was emitted.  This is the same format as the
+        // original answer key (except that numbers emitted zero times
+        // will not appear in the regenerated key.)
+        //
+        File finalOuts = new File(testdir, "finalouts");
+        fs.mkdirs(finalOuts);
+        JobConf checkJob = new JobConf(conf);
+        checkJob.setInputDir(randomOuts);
+        checkJob.setInputKeyClass(LongWritable.class);
+        checkJob.setInputValueClass(UTF8.class);
+        checkJob.setInputFormat(TextInputFormat.class);
+        checkJob.setMapperClass(RandomCheckMapper.class);
+
+        checkJob.setOutputDir(finalOuts);
+        checkJob.setOutputKeyClass(IntWritable.class);
+        checkJob.setOutputValueClass(IntWritable.class);
+        checkJob.setOutputFormat(SequenceFileOutputFormat.class);
+        checkJob.setReducerClass(RandomCheckReducer.class);
+        checkJob.setNumReduceTasks(1);
+
+        JobClient.runJob(checkJob);
+
+        //
+        // Finally, we compare the reconstructed answer key with the
+        // original one.  Remember, we need to ignore zero-count items
+        // in the original key.
+        //
+        boolean success = true;
+        File recomputedkey = new File(finalOuts, "part-00000");
+        SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey.getPath(), conf);
+        int totalseen = 0;
+        try {
+            IntWritable key = new IntWritable();
+            IntWritable val = new IntWritable();            
+            for (int i = 0; i < range; i++) {
+                if (dist[i] == 0) {
+                    continue;
+                }
+                if (! in.next(key, val)) {
+                    System.err.println("Cannot read entry " + i);
+                    success = false;
+                    break;
+                } else {
+                    if ( !((key.get() == i ) && (val.get() == dist[i]))) {
+                        System.err.println("Mismatch!  Pos=" + key.get() + ", i=" + i + ", val=" + val.get() + ", dist[i]=" + dist[i]);
+                        success = false;
+                    }
+                    totalseen += val.get();
+                }
+            }
+            if (success) {
+                if (in.next(key, val)) {
+                    System.err.println("Unnecessary lines in recomputed key!");
+                    success = false;
+                }
+            }
+        } finally {
+            in.close();
+        }
+        int originalTotal = 0;
+        for (int i = 0; i < dist.length; i++) {
+            originalTotal += dist[i];
+        }
+        System.out.println("Original sum: " + originalTotal);
+        System.out.println("Recomputed sum: " + totalseen);
+
+        //
+        // Write to "results" whether the test succeeded or not.
+        //
+        File resultFile = new File(testdir, "results");
+        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(resultFile)));
+        try {
+            bw.write("Success=" + success + "\n");
+            System.out.println("Success=" + success);            
+        } finally {
+            bw.close();
+        }
+    }
+
+    /**
+     * Launches all the tasks in order.
+     */
+    public static void main(String[] argv) throws Exception {
+        if (argv.length < 2) {
+            System.err.println("Usage: MapredLoadTest <range> <counts>");
+            System.err.println();
+            System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>");
+            return;
+        }
+
+        int i = 0;
+        int range = Integer.parseInt(argv[i++]);
+        int counts = Integer.parseInt(argv[i++]);
+
+        MapredLoadTest mlt = new MapredLoadTest(range, counts, new Configuration());
+        mlt.launch();
+    }
+}

+ 115 - 0
src/test/org/apache/hadoop/mapred/TestSequenceFileInputFormat.java

@@ -0,0 +1,115 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+import java.util.logging.*;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.conf.*;
+
+public class TestSequenceFileInputFormat extends TestCase {
+  private static final Logger LOG = InputFormatBase.LOG;
+
+  private static int MAX_LENGTH = 10000;
+  private static Configuration conf = new Configuration();
+
+  public void testFormat() throws Exception {
+    JobConf job = new JobConf(conf);
+    NutchFileSystem fs = NutchFileSystem.getNamed("local", conf);
+    File dir = new File(System.getProperty("test.build.data",".") + "/mapred");
+    File file = new File(dir, "test.seq");
+    
+    Reporter reporter = new Reporter() {
+        public void setStatus(String status) throws IOException {}
+      };
+    
+    int seed = new Random().nextInt();
+    //LOG.info("seed = "+seed);
+    Random random = new Random(seed);
+
+    fs.delete(dir);
+
+    job.setInputDir(dir);
+
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length+= random.nextInt(MAX_LENGTH/10)+1) {
+
+      //LOG.info("creating; entries = " + length);
+
+      // create a file with length entries
+      SequenceFile.Writer writer =
+        new SequenceFile.Writer(fs, file.toString(),
+                                IntWritable.class, BytesWritable.class);
+      try {
+        for (int i = 0; i < length; i++) {
+          IntWritable key = new IntWritable(i);
+          byte[] data = new byte[random.nextInt(10)];
+          random.nextBytes(data);
+          BytesWritable value = new BytesWritable(data);
+          writer.append(key, value);
+        }
+      } finally {
+        writer.close();
+      }
+
+      // try splitting the file in a variety of sizes
+      InputFormat format = new SequenceFileInputFormat();
+      IntWritable key = new IntWritable();
+      BytesWritable value = new BytesWritable();
+      for (int i = 0; i < 3; i++) {
+        int numSplits =
+          random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
+        //LOG.info("splitting: requesting = " + numSplits);
+        FileSplit[] splits = format.getSplits(fs, job, numSplits);
+        //LOG.info("splitting: got =        " + splits.length);
+
+        // check each split
+        BitSet bits = new BitSet(length);
+        for (int j = 0; j < splits.length; j++) {
+          RecordReader reader =
+            format.getRecordReader(fs, splits[j], job, reporter);
+          try {
+            int count = 0;
+            while (reader.next(key, value)) {
+              // if (bits.get(key.get())) {
+              // LOG.info("splits["+j+"]="+splits[j]+" : " + key.get());
+              // LOG.info("@"+reader.getPos());
+              // }
+              assertFalse("Key in multiple partitions.", bits.get(key.get()));
+              bits.set(key.get());
+              count++;
+            }
+            //LOG.info("splits["+j+"]="+splits[j]+" count=" + count);
+          } finally {
+            reader.close();
+          }
+        }
+        assertEquals("Some keys in no partition.", length, bits.cardinality());
+      }
+
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    new TestSequenceFileInputFormat().testFormat();
+  }
+}

+ 109 - 0
src/test/org/apache/hadoop/mapred/TestTextInputFormat.java

@@ -0,0 +1,109 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+import java.util.logging.*;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.conf.*;
+
+public class TestTextInputFormat extends TestCase {
+  private static final Logger LOG = InputFormatBase.LOG;
+
+  private static int MAX_LENGTH = 10000;
+  private static Configuration conf = new Configuration();
+  
+  public void testFormat() throws Exception {
+    JobConf job = new JobConf(conf);
+    NutchFileSystem fs = NutchFileSystem.getNamed("local", conf);
+    File dir = new File(System.getProperty("test.build.data",".") + "/mapred");
+    File file = new File(dir, "test.txt");
+
+    Reporter reporter = new Reporter() {
+        public void setStatus(String status) throws IOException {}
+      };
+    
+    int seed = new Random().nextInt();
+    //LOG.info("seed = "+seed);
+    Random random = new Random(seed);
+
+    fs.delete(dir);
+    job.setInputDir(dir);
+
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length+= random.nextInt(MAX_LENGTH/10)+1) {
+
+      //LOG.info("creating; entries = " + length);
+
+      // create a file with length entries
+      Writer writer = new OutputStreamWriter(fs.create(file));
+      try {
+        for (int i = 0; i < length; i++) {
+          writer.write(Integer.toString(i));
+          writer.write("\n");
+        }
+      } finally {
+        writer.close();
+      }
+
+      // try splitting the file in a variety of sizes
+      InputFormat format = new TextInputFormat();
+      LongWritable key = new LongWritable();
+      UTF8 value = new UTF8();
+      for (int i = 0; i < 3; i++) {
+        int numSplits = random.nextInt(MAX_LENGTH/20)+1;
+        //LOG.info("splitting: requesting = " + numSplits);
+        FileSplit[] splits = format.getSplits(fs, job, numSplits);
+        //LOG.info("splitting: got =        " + splits.length);
+
+        // check each split
+        BitSet bits = new BitSet(length);
+        for (int j = 0; j < splits.length; j++) {
+          RecordReader reader =
+            format.getRecordReader(fs, splits[j], job, reporter);
+          try {
+            int count = 0;
+            while (reader.next(key, value)) {
+              int v = Integer.parseInt(value.toString());
+              //             if (bits.get(v)) {
+              //               LOG.info("splits["+j+"]="+splits[j]+" : " + v);
+              //               LOG.info("@"+reader.getPos());
+              //             }
+              assertFalse("Key in multiple partitions.", bits.get(v));
+              bits.set(v);
+              count++;
+            }
+            //LOG.info("splits["+j+"]="+splits[j]+" count=" + count);
+          } finally {
+            reader.close();
+          }
+        }
+        assertEquals("Some keys in no partition.", length, bits.cardinality());
+      }
+
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    new TestTextInputFormat().testFormat();
+  }
+}

+ 530 - 0
src/test/org/apache/hadoop/ndfs/TestNDFS.java

@@ -0,0 +1,530 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.dfs.
+
+import junit.framework.TestCase;
+import junit.framework.AssertionFailedError;
+import org.apache.hadoop.fs.NFSInputStream;
+import org.apache.hadoop.fs.NFSOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.util.LogFormatter;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.ListIterator;
+import java.util.logging.Logger;
+import java.util.Random;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Test DFS.
+ * TestDFS is a JUnit test for DFS using "pseudo multiprocessing" (or 
+ more strictly, pseudo distributed) meaning all daemons run in one process 
+ and sockets are used to communicate between daemons.  The test permutes
+ * various block sizes, number of files, file sizes, and number of
+ * datanodes.  After creating 1 or more files and filling them with random
+ * data, one datanode is shutdown, and then the files are verfified.
+ * Next, all the random test files are deleted and we test for leakage
+ * (non-deletion) by directly checking the real directories corresponding
+ * to the datanodes still running.
+ * <p>
+ * Usage notes: TEST_PERMUTATION_MAX can be adjusted to perform more or
+ * less testing of permutations.  The ceiling of useful permutation is
+ * TEST_PERMUTATION_MAX_CEILING.
+ * <p>
+ * DFSClient emits many messages that can be ignored like:
+ * "Failed to connect to *:7000:java.net.ConnectException: Connection refused: connect"
+ * because a datanode is forced to close during testing.
+ * <p>
+ * Warnings about "Zero targets found" can be ignored (these are naggingly
+ * emitted even though it is not possible to achieve the desired replication
+ * level with the number of active datanodes.)
+ * <p>
+ * Possible Extensions:
+ * <p>Bring a datanode down and restart it to verify reconnection to namenode.
+ * <p>Simulate running out of disk space on one datanode only.
+ * <p>Bring the namenode down and restart it to verify that datanodes reconnect.
+ * <p>
+ * <p>For a another approach to filesystem testing, see the high level
+ * (NutchFS level) test {@link org.apache.hadoop.fs.TestNutchFileSystem}.
+ * @author Paul Baclace
+ */
+public class TestDFS extends TestCase implements FSConstants {
+  private static final Logger LOG =
+      LogFormatter.getLogger("org.apache.hadoop.dfs.TestDFS");
+
+  private static Configuration conf = new Configuration();
+  private static int BUFFER_SIZE =
+      conf.getInt("io.file.buffer.size", 4096);
+
+  private static int testCycleNumber = 0;
+
+  /**
+   * all DFS test files go under this base directory
+   */
+  private static String baseDirSpecified;
+
+  /**
+   * base dir as File
+   */
+  private static File baseDir;
+
+  /** DFS block sizes to permute over in multiple test cycles
+   * (array length should be prime).
+   */
+  private static final int[] BLOCK_SIZES = {100000, 4096};
+
+  /** DFS file sizes to permute over in multiple test cycles
+   * (array length should be prime).
+   */
+  private static final int[] FILE_SIZES =
+      {100000, 100001, 4095, 4096, 4097, 1000000, 1000001};
+
+  /** DFS file counts to permute over in multiple test cycles
+   * (array length should be prime).
+   */
+  private static final int[] FILE_COUNTS = {1, 10, 100};
+
+  /** Number of useful permutations or test cycles.
+   * (The 2 factor represents the alternating 2 or 3 number of datanodes
+   * started.)
+   */
+  private static final int TEST_PERMUTATION_MAX_CEILING =
+    BLOCK_SIZES.length * FILE_SIZES.length * FILE_COUNTS.length * 2;
+
+  /** Number of permutations of DFS test parameters to perform.
+   * If this is greater than ceiling TEST_PERMUTATION_MAX_CEILING, then the
+   * ceiling value is used.
+   */
+  private static final int TEST_PERMUTATION_MAX = 3;
+  private Constructor randomDataGeneratorCtor = null;
+
+  static {
+    baseDirSpecified = System.getProperty("test.dfs.data", "/tmp/dfs_test");
+    baseDir = new File(baseDirSpecified);
+  }
+
+  protected void setUp() throws Exception {
+    super.setUp();
+    conf.setBoolean("test.dfs.same.host.targets.allowed", true);
+  }
+
+ /**
+  * Remove old files from temp area used by this test case and be sure
+  * base temp directory can be created.
+  */
+  protected void prepareTempFileSpace() {
+    if (baseDir.exists()) {
+      try { // start from a blank slate
+        FileUtil.fullyDelete(baseDir, conf);
+      } catch (Exception ignored) {
+      }
+    }
+    baseDir.mkdirs();
+    if (!baseDir.isDirectory()) {
+      throw new RuntimeException("Value of root directory property test.dfs.data for dfs test is not a directory: "
+          + baseDirSpecified);
+    }
+  }
+
+  /**
+   * Pseudo Distributed FS Test.
+   * Test DFS by running all the necessary daemons in one process.
+   * Test various block sizes, number of files, disk space consumption,
+   * and leakage.
+   *
+   * @throws Exception
+   */
+  public void testFsPseudoDistributed()
+      throws Exception {
+    while (testCycleNumber < TEST_PERMUTATION_MAX &&
+        testCycleNumber < TEST_PERMUTATION_MAX_CEILING) {
+        int blockSize = BLOCK_SIZES[testCycleNumber % BLOCK_SIZES.length];
+        int numFiles = FILE_COUNTS[testCycleNumber % FILE_COUNTS.length];
+        int fileSize = FILE_SIZES[testCycleNumber % FILE_SIZES.length];
+        prepareTempFileSpace();
+        testFsPseudoDistributed(fileSize, numFiles, blockSize,
+            (testCycleNumber % 2) + 2);
+    }
+  }
+
+  /**
+   * Pseudo Distributed FS Testing.
+   * Do one test cycle with given parameters.
+   *
+   * @param nBytes         number of bytes to write to each file.
+   * @param numFiles       number of files to create.
+   * @param blockSize      block size to use for this test cycle.
+   * @param initialDNcount number of datanodes to create
+   * @throws Exception
+   */
+  public void testFsPseudoDistributed(long nBytes, int numFiles,
+                                      int blockSize, int initialDNcount)
+      throws Exception {
+    long startTime = System.currentTimeMillis();
+    int bufferSize = Math.min(BUFFER_SIZE, blockSize);
+    boolean checkDataDirsEmpty = false;
+    int iDatanodeClosed = 0;
+    Random randomDataGenerator = makeRandomDataGenerator();
+    final int currentTestCycleNumber = testCycleNumber;
+    msg("using randomDataGenerator=" + randomDataGenerator.getClass().getName());
+
+    //
+    //     modify config for test
+
+    //
+    // set given config param to override other config settings
+    conf.setInt("test.dfs.block_size", blockSize);
+    // verify that config changed
+    assertTrue(blockSize == conf.getInt("test.dfs.block_size", 2)); // 2 is an intentional obviously-wrong block size
+    // downsize for testing (just to save resources)
+    conf.setInt("dfs.namenode.handler.count", 3);
+    if (false) { //  use MersenneTwister, if present
+      conf.set("nutch.random.class",
+                          "org.apache.nutch.util.MersenneTwister");
+    }
+    conf.setLong("dfs.blockreport.intervalMsec", 50*1000L);
+    conf.setLong("dfs.datanode.startupMsec", 15*1000L);
+
+    String nameFSDir = baseDirSpecified + "/name";
+    msg("----Start Test Cycle=" + currentTestCycleNumber +
+        " test.dfs.block_size=" + blockSize +
+        " nBytes=" + nBytes +
+        " numFiles=" + numFiles +
+        " initialDNcount=" + initialDNcount);
+
+    //
+    //          start a NameNode
+
+    int nameNodePort = 9000 + testCycleNumber++; // ToDo: settable base port
+    String nameNodeSocketAddr = "localhost:" + nameNodePort;
+    NameNode nameNodeDaemon = new NameNode(new File(nameFSDir), nameNodePort, conf);
+    DFSClient dfsClient = null;
+    try {
+      //
+      //        start some DataNodes
+      //
+      ArrayList listOfDataNodeDaemons = new ArrayList();
+      conf.set("fs.default.name", nameNodeSocketAddr);
+      for (int i = 0; i < initialDNcount; i++) {
+        // uniquely config real fs path for data storage for this datanode
+        String dataDir = baseDirSpecified + "/datanode" + i;
+        conf.set("dfs.data.dir", dataDir);
+        DataNode dn = DataNode.makeInstanceForDir(dataDir, conf);
+        if (dn != null) {
+          listOfDataNodeDaemons.add(dn);
+          (new Thread(dn, "DataNode" + i + ": " + dataDir)).start();
+        }
+      }
+      try {
+        assertTrue("insufficient datanodes for test to continue",
+            (listOfDataNodeDaemons.size() >= 2));
+
+        //
+        //          wait for datanodes to report in
+        awaitQuiescence();
+
+        //  act as if namenode is a remote process
+        dfsClient = new DFSClient(new InetSocketAddress("localhost", nameNodePort), conf);
+
+        //
+        //           write nBytes of data using randomDataGenerator to numFiles
+        //
+        ArrayList testfilesList = new ArrayList();
+        byte[] buffer = new byte[bufferSize];
+        UTF8 testFileName = null;
+        for (int iFileNumber = 0; iFileNumber < numFiles; iFileNumber++) {
+          testFileName = new UTF8("/f" + iFileNumber);
+          testfilesList.add(testFileName);
+          NFSOutputStream nos = dfsClient.create(testFileName, false);
+          try {
+            for (long nBytesWritten = 0L;
+                 nBytesWritten < nBytes;
+                 nBytesWritten += buffer.length) {
+              if ((nBytesWritten + buffer.length) > nBytes) {
+                // calculate byte count needed to exactly hit nBytes in length
+                //  to keep randomDataGenerator in sync during the verify step
+                int pb = (int) (nBytes - nBytesWritten);
+                byte[] bufferPartial = new byte[pb];
+                randomDataGenerator.nextBytes(bufferPartial);
+                nos.write(bufferPartial);
+              } else {
+                randomDataGenerator.nextBytes(buffer);
+                nos.write(buffer);
+              }
+            }
+          } finally {
+            nos.flush();
+            nos.close();
+          }
+        }
+
+        //
+        // No need to wait for blocks to be replicated because replication
+        //  is supposed to be complete when the file is closed.
+        //
+
+        //
+        //                     take one datanode down
+        iDatanodeClosed =
+            currentTestCycleNumber % listOfDataNodeDaemons.size();
+        DataNode dn = (DataNode) listOfDataNodeDaemons.get(iDatanodeClosed);
+        msg("shutdown datanode daemon " + iDatanodeClosed +
+            " dn=" + dn.data);
+        try {
+          dn.shutdown();
+        } catch (Exception e) {
+          msg("ignoring datanode shutdown exception=" + e);
+        }
+
+        //
+        //          verify data against a "rewound" randomDataGenerator
+        //               that all of the data is intact
+        long lastLong = randomDataGenerator.nextLong();
+        randomDataGenerator = makeRandomDataGenerator(); // restart (make new) PRNG
+        ListIterator li = testfilesList.listIterator();
+        while (li.hasNext()) {
+          testFileName = (UTF8) li.next();
+          NFSInputStream nis = dfsClient.open(testFileName);
+          byte[] bufferGolden = new byte[bufferSize];
+          int m = 42;
+          try {
+            while (m != -1) {
+              m = nis.read(buffer);
+              if (m == buffer.length) {
+                randomDataGenerator.nextBytes(bufferGolden);
+                assertBytesEqual(buffer, bufferGolden, buffer.length);
+              } else if (m > 0) {
+                byte[] bufferGoldenPartial = new byte[m];
+                randomDataGenerator.nextBytes(bufferGoldenPartial);
+                assertBytesEqual(buffer, bufferGoldenPartial, bufferGoldenPartial.length);
+              }
+            }
+          } finally {
+            nis.close();
+          }
+        }
+        // verify last randomDataGenerator rand val to ensure last file length was checked
+        long lastLongAgain = randomDataGenerator.nextLong();
+        assertEquals(lastLong, lastLongAgain);
+        msg("Finished validating all file contents");
+
+        //
+        //                    now delete all the created files
+        msg("Delete all random test files under DFS via remaining datanodes");
+        li = testfilesList.listIterator();
+        while (li.hasNext()) {
+          testFileName = (UTF8) li.next();
+          assertTrue(dfsClient.delete(testFileName));
+        }
+
+        //
+        //                   wait for delete to be propagated
+        //                  (unlike writing files, delete is lazy)
+        msg("Test thread sleeping while datanodes propagate delete...");
+        awaitQuiescence();
+        msg("Test thread awakens to verify file contents");
+
+        //
+        //             check that the datanode's block directory is empty
+        //                (except for datanode that had forced shutdown)
+        checkDataDirsEmpty = true; // do it during finally clause
+
+      } catch (AssertionFailedError afe) {
+        throw afe;
+      } catch (Throwable t) {
+        msg("Unexpected exception_b: " + t);
+        t.printStackTrace();
+      } finally {
+        //
+        // shut down datanode daemons (this takes advantage of being same-process)
+        msg("begin shutdown of all datanode daemons for test cycle " +
+            currentTestCycleNumber);
+
+        for (int i = 0; i < listOfDataNodeDaemons.size(); i++) {
+          DataNode dataNode = (DataNode) listOfDataNodeDaemons.get(i);
+          if (i != iDatanodeClosed) {
+            try {
+              if (checkDataDirsEmpty) {
+                File dataDir = new File(dataNode.data.dirpath);
+                assertNoBlocks(dataDir);
+
+              }
+              dataNode.shutdown();
+            } catch (Exception e) {
+              msg("ignoring exception during (all) datanode shutdown, e=" + e);
+            }
+          }
+        }
+      }
+      msg("finished shutdown of all datanode daemons for test cycle " +
+          currentTestCycleNumber);
+      if (dfsClient != null) {
+        try {
+          msg("close down subthreads of DFSClient");
+          dfsClient.close();
+        } catch (Exception ignored) { }
+        msg("finished close down of DFSClient");
+      }
+    } catch (AssertionFailedError afe) {
+      throw afe;
+    } catch (Throwable t) {
+      msg("Unexpected exception_a: " + t);
+      t.printStackTrace();
+    } finally {
+      // shut down namenode daemon (this takes advantage of being same-process)
+      msg("begin shutdown of namenode daemon for test cycle " +
+          currentTestCycleNumber);
+      try {
+        nameNodeDaemon.stop();
+      } catch (Exception e) {
+        msg("ignoring namenode shutdown exception=" + e);
+      }
+      msg("finished shutdown of namenode daemon for test cycle " +
+          currentTestCycleNumber);
+    }
+    msg("test cycle " + currentTestCycleNumber + " elapsed time=" +
+        (System.currentTimeMillis() - startTime) / 1000. + "sec");
+    msg("threads still running (look for stragglers): ");
+    msg(summarizeThreadGroup());
+  }
+
+  private void assertNoBlocks(File datanodeDir) {
+    File datanodeDataDir = new File(datanodeDir, "data");
+    String[] blockFilenames =
+        datanodeDataDir.list(
+            new FilenameFilter() {
+              public boolean accept(File dir, String name){
+                return Block.isBlockFilename(new File(dir, name));}});
+    // if this fails, the delete did not propagate because either
+    //   awaitQuiescence() returned before the disk images were removed
+    //   or a real failure was detected.
+    assertTrue(" data dir not empty: " + datanodeDataDir,
+               blockFilenames.length==0);
+  }
+
+  /**
+   * Make a data generator.
+   * Allows optional use of high quality PRNG by setting property
+   * nutch.random.class to the full class path of a subclass of
+   * java.util.Random such as "...util.MersenneTwister".
+   * The property test.dfs.random.seed can supply a seed for reproducible
+   * testing (a default is set here if property is not set.)
+   */
+  private Random makeRandomDataGenerator() {
+    long seed = conf.getLong("test.dfs.random.seed", 0xB437EF);
+    try {
+      if (randomDataGeneratorCtor == null) {
+        // lazy init
+        String rndDataGenClassname =
+            conf.get("nutch.random.class", "java.util.Random");
+        Class clazz = Class.forName(rndDataGenClassname);
+        randomDataGeneratorCtor = clazz.getConstructor(new Class[]{Long.TYPE});
+      }
+
+      if (randomDataGeneratorCtor != null) {
+        Object arg[] = {new Long(seed)};
+        return (Random) randomDataGeneratorCtor.newInstance(arg);
+      }
+    } catch (ClassNotFoundException absorb) {
+    } catch (NoSuchMethodException absorb) {
+    } catch (SecurityException absorb) {
+    } catch (InstantiationException absorb) {
+    } catch (IllegalAccessException absorb) {
+    } catch (IllegalArgumentException absorb) {
+    } catch (InvocationTargetException absorb) {
+    }
+
+    // last resort
+    return new java.util.Random(seed);
+  }
+
+  /** Wait for the DFS datanodes to become quiescent.
+   * The initial implementation is to sleep for some fixed amount of time,
+   * but a better implementation would be to really detect when distributed
+   * operations are completed.
+   * @throws InterruptedException
+   */
+  private void awaitQuiescence() throws InterruptedException {
+    // ToDo: Need observer pattern, not static sleep
+    // Doug suggested that the block report interval could be made shorter
+    //   and then observing that would be a good way to know when an operation
+    //   was complete (quiescence detect).
+    sleepAtLeast(60000);
+  }
+
+  private void assertBytesEqual(byte[] buffer, byte[] bufferGolden, int len) {
+    for (int i = 0; i < len; i++) {
+      assertEquals(buffer[i], bufferGolden[i]);
+    }
+  }
+
+  private void msg(String s) {
+    //System.out.println(s);
+    LOG.info(s);
+  }
+
+  public static void sleepAtLeast(int tmsec) {
+    long t0 = System.currentTimeMillis();
+    long t1 = t0;
+    long tslept = t1 - t0;
+    while (tmsec > tslept) {
+      try {
+        long tsleep = tmsec - tslept;
+        Thread.sleep(tsleep);
+        t1 = System.currentTimeMillis();
+      }  catch (InterruptedException ie) {
+        t1 = System.currentTimeMillis();
+      }
+      tslept = t1 - t0;
+    }
+  }
+
+  public static String summarizeThreadGroup() {
+    int n = 10;
+    int k = 0;
+    Thread[] tarray = null;
+    StringBuffer sb = new StringBuffer(500);
+    do {
+      n = n * 10;
+      tarray = new Thread[n];
+      k = Thread.enumerate(tarray);
+    } while (k == n); // while array is too small...
+    for (int i = 0; i < k; i++) {
+      Thread thread = tarray[i];
+      sb.append(thread.toString());
+      sb.append("\n");
+    }
+    return sb.toString();
+  }
+
+  public static void main(String[] args) throws Exception {
+    String usage = "Usage: TestDFS (no args)";
+    if (args.length != 0) {
+      System.err.println(usage);
+      System.exit(-1);
+    }
+    String[] testargs = {"org.apache.hadoop.dfs.TestDFS"};
+    junit.textui.TestRunner.main(testargs);
+  }
+
+}