Browse Source

HADOOP-532. Fix a bug reading value-compressed sequence files, where an exception was thrown reporting that the full value had not been read. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@443532 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 years ago
parent
commit
3643471905

+ 4 - 0
CHANGES.txt

@@ -17,6 +17,10 @@ Trunk (unreleased changes)
 4. HADOOP-288.  Add a file caching system and use it in MapReduce to
 4. HADOOP-288.  Add a file caching system and use it in MapReduce to
    cache job jar files on slave nodes.  (Mahadev Konar via cutting)
    cache job jar files on slave nodes.  (Mahadev Konar via cutting)
 
 
+5. HADOOP-532.  Fix a bug reading value-compressed sequence files,
+   where an exception was thrown reporting that the full value had not
+   been read.  (omalley via cutting)
+
 
 
 Release 0.6.1 - 2006-08-13
 Release 0.6.1 - 2006-08-13
 
 

+ 16 - 8
src/java/org/apache/hadoop/io/SequenceFile.java

@@ -976,10 +976,14 @@ public class SequenceFile {
 
 
       if (version > 2) {                          // if version > 2
       if (version > 2) {                          // if version > 2
         this.decompress = in.readBoolean();       // is compressed?
         this.decompress = in.readBoolean();       // is compressed?
+      } else {
+        decompress = false;
       }
       }
 
 
       if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
       if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
         this.blockCompressed = in.readBoolean();  // is block-compressed?
         this.blockCompressed = in.readBoolean();  // is block-compressed?
+      } else {
+        blockCompressed = false;
       }
       }
       
       
       // if version >= 5
       // if version >= 5
@@ -1008,9 +1012,9 @@ public class SequenceFile {
       valBuffer = new DataInputBuffer();
       valBuffer = new DataInputBuffer();
       if (decompress) {
       if (decompress) {
         valInFilter = this.codec.createInputStream(valBuffer);
         valInFilter = this.codec.createInputStream(valBuffer);
-        valIn = new DataInputStream(new BufferedInputStream(valInFilter));
+        valIn = new DataInputStream(valInFilter);
       } else {
       } else {
-        valIn = new DataInputStream(new BufferedInputStream(valBuffer));
+        valIn = valBuffer;
       }
       }
       
       
       if (blockCompressed) {
       if (blockCompressed) {
@@ -1113,10 +1117,11 @@ public class SequenceFile {
      * corresponding to the 'current' key 
      * corresponding to the 'current' key 
      */
      */
     private synchronized void seekToCurrentValue() throws IOException {
     private synchronized void seekToCurrentValue() throws IOException {
-      if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
+      if (!blockCompressed) {
         if (decompress) {
         if (decompress) {
           valInFilter.resetState();
           valInFilter.resetState();
         }
         }
+        valBuffer.reset();
       } else {
       } else {
         // Check if this is the first value in the 'block' to be read
         // Check if this is the first value in the 'block' to be read
         if (lazyDecompress && !valuesDecompressed) {
         if (lazyDecompress && !valuesDecompressed) {
@@ -1160,13 +1165,15 @@ public class SequenceFile {
       // Position stream to 'current' value
       // Position stream to 'current' value
       seekToCurrentValue();
       seekToCurrentValue();
 
 
-      if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
+      if (!blockCompressed) {
         val.readFields(valIn);
         val.readFields(valIn);
         
         
-        if (valBuffer.getPosition() != valBuffer.getLength())
+        if (valIn.read() > 0) {
+          LOG.info("available bytes: " + valIn.available());
           throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
           throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
               + " bytes, should read " +
               + " bytes, should read " +
               (valBuffer.getLength()-keyLength));
               (valBuffer.getLength()-keyLength));
+        }
       } else {
       } else {
         // Get the value
         // Get the value
         int valLength = WritableUtils.readVInt(valLenIn);
         int valLength = WritableUtils.readVInt(valLenIn);
@@ -1190,7 +1197,7 @@ public class SequenceFile {
         throw new IOException("wrong key class: "+key.getClass().getName()
         throw new IOException("wrong key class: "+key.getClass().getName()
             +" is not "+keyClass);
             +" is not "+keyClass);
 
 
-      if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
+      if (!blockCompressed) {
         outBuf.reset();
         outBuf.reset();
         
         
         keyLength = next(outBuf);
         keyLength = next(outBuf);
@@ -1200,6 +1207,7 @@ public class SequenceFile {
         valBuffer.reset(outBuf.getData(), outBuf.getLength());
         valBuffer.reset(outBuf.getData(), outBuf.getLength());
         
         
         key.readFields(valBuffer);
         key.readFields(valBuffer);
+        valBuffer.mark(0);
         if (valBuffer.getPosition() != keyLength)
         if (valBuffer.getPosition() != keyLength)
           throw new IOException(key + " read " + valBuffer.getPosition()
           throw new IOException(key + " read " + valBuffer.getPosition()
               + " bytes, should read " + keyLength);
               + " bytes, should read " + keyLength);
@@ -1271,7 +1279,7 @@ public class SequenceFile {
     /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
     /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
     public synchronized int next(DataOutputBuffer buffer) throws IOException {
     public synchronized int next(DataOutputBuffer buffer) throws IOException {
       // Unsupported for block-compressed sequence files
       // Unsupported for block-compressed sequence files
-      if (version >= BLOCK_COMPRESS_VERSION && blockCompressed) {
+      if (blockCompressed) {
         throw new IOException("Unsupported call for block-compressed" +
         throw new IOException("Unsupported call for block-compressed" +
             " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
             " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
       }
       }
@@ -1308,7 +1316,7 @@ public class SequenceFile {
      */
      */
     public int nextRaw(DataOutputBuffer key, ValueBytes val) 
     public int nextRaw(DataOutputBuffer key, ValueBytes val) 
     throws IOException {
     throws IOException {
-      if (version < BLOCK_COMPRESS_VERSION || blockCompressed == false) {
+      if (!blockCompressed) {
         if (in.getPos() >= end) 
         if (in.getPos() >= end) 
           return -1;
           return -1;
 
 

+ 5 - 1
src/test/org/apache/hadoop/io/RandomDatum.java

@@ -26,11 +26,15 @@ public class RandomDatum implements WritableComparable {
   public RandomDatum() {}
   public RandomDatum() {}
 
 
   public RandomDatum(Random random) {
   public RandomDatum(Random random) {
-    length = 10 + random.nextInt(100);
+    length = 10 + (int) Math.pow(10.0, random.nextFloat() * 3.0);
     data = new byte[length];
     data = new byte[length];
     random.nextBytes(data);
     random.nextBytes(data);
   }
   }
 
 
+  public int getLength() {
+    return length;
+  }
+  
   public void write(DataOutput out) throws IOException {
   public void write(DataOutput out) throws IOException {
     out.writeInt(length);
     out.writeInt(length);
     out.write(data);
     out.write(data);

+ 37 - 20
src/test/org/apache/hadoop/io/TestSequenceFile.java

@@ -47,6 +47,7 @@ public class TestSequenceFile extends TestCase {
       new Path(System.getProperty("test.build.data",".")+"/test.bc.seq");
       new Path(System.getProperty("test.build.data",".")+"/test.bc.seq");
  
  
     int seed = new Random().nextInt();
     int seed = new Random().nextInt();
+    LOG.info("Seed = " + seed);
 
 
     FileSystem fs = new LocalFileSystem(conf);
     FileSystem fs = new LocalFileSystem(conf);
     try {
     try {
@@ -115,7 +116,8 @@ public class TestSequenceFile extends TestCase {
       CompressionType compressionType)
       CompressionType compressionType)
     throws IOException {
     throws IOException {
     fs.delete(file);
     fs.delete(file);
-    LOG.debug("creating with " + count + " records");
+    LOG.info("creating " + count + " records with " + compressionType +
+              " compression");
     SequenceFile.Writer writer = 
     SequenceFile.Writer writer = 
       SequenceFile.createWriter(fs, conf, file, 
       SequenceFile.createWriter(fs, conf, file, 
           RandomDatum.class, RandomDatum.class, compressionType);
           RandomDatum.class, RandomDatum.class, compressionType);
@@ -146,25 +148,36 @@ public class TestSequenceFile extends TestCase {
       RandomDatum key = generator.getKey();
       RandomDatum key = generator.getKey();
       RandomDatum value = generator.getValue();
       RandomDatum value = generator.getValue();
 
 
-      if ((i%5) == 10) {
-        // Testing 'raw' apis
-        rawKey.reset();
-        reader.nextRaw(rawKey, rawValue);
-      } else {
-        // Testing 'non-raw' apis 
-        if ((i%2) == 0) {
-          reader.next(k);
-          reader.getCurrentValue(v);
+      try {
+        if ((i%5) == 10) {
+          // Testing 'raw' apis
+          rawKey.reset();
+          reader.nextRaw(rawKey, rawValue);
         } else {
         } else {
-          reader.next(k, v);
+          // Testing 'non-raw' apis 
+          if ((i%2) == 0) {
+            reader.next(k);
+            reader.getCurrentValue(v);
+          } else {
+            reader.next(k, v);
+          }
+          // Sanity check
+          if (!k.equals(key))
+            throw new RuntimeException("wrong key at " + i);
+          if (!v.equals(value))
+            throw new RuntimeException("wrong value at " + i);
         }
         }
-
-        // Sanity check
-        if (!k.equals(key))
-          throw new RuntimeException("wrong key at " + i);
-        if (!v.equals(value))
-          throw new RuntimeException("wrong value at " + i);
+      } catch (IOException ioe) {
+        LOG.info("Problem on row " + i);
+        LOG.info("Expected value = " + value);
+        LOG.info("Expected len = " + value.getLength());
+        LOG.info("Actual value = " + v);
+        LOG.info("Actual len = " + v.getLength());
+        LOG.info("Key equals: " + k.equals(key));
+        LOG.info("value equals: " + v.equals(value));
+        throw ioe;
       }
       }
+
     }
     }
     reader.close();
     reader.close();
   }
   }
@@ -284,9 +297,11 @@ public class TestSequenceFile extends TestCase {
     boolean merge = false;
     boolean merge = false;
     String compressType = "NONE";
     String compressType = "NONE";
     Path file = null;
     Path file = null;
+    int seed = new Random().nextInt();
 
 
     String usage = "Usage: SequenceFile (-local | -dfs <namenode:port>) " +
     String usage = "Usage: SequenceFile (-local | -dfs <namenode:port>) " +
-        "[-count N] " + "[-check] [-compressType <NONE|RECORD|BLOCK>] " +
+        "[-count N] " + 
+        "[-seed #] [-check] [-compressType <NONE|RECORD|BLOCK>] " +
         "[[-rwonly] | {[-megabytes M] [-factor F] [-nocreate] [-fast] [-merge]}] " +
         "[[-rwonly] | {[-megabytes M] [-factor F] [-nocreate] [-fast] [-merge]}] " +
         " file";
         " file";
     if (args.length == 0) {
     if (args.length == 0) {
@@ -304,7 +319,9 @@ public class TestSequenceFile extends TestCase {
           } else if (args[i].equals("-megabytes")) {
           } else if (args[i].equals("-megabytes")) {
               megabytes = Integer.parseInt(args[++i]);
               megabytes = Integer.parseInt(args[++i]);
           } else if (args[i].equals("-factor")) {
           } else if (args[i].equals("-factor")) {
-              factor = Integer.parseInt(args[++i]);
+            factor = Integer.parseInt(args[++i]);
+          } else if (args[i].equals("-seed")) {
+            seed = Integer.parseInt(args[++i]);
           } else if (args[i].equals("-rwonly")) {
           } else if (args[i].equals("-rwonly")) {
               rwonly = true;
               rwonly = true;
           } else if (args[i].equals("-nocreate")) {
           } else if (args[i].equals("-nocreate")) {
@@ -326,6 +343,7 @@ public class TestSequenceFile extends TestCase {
         LOG.info("megabytes = " + megabytes);
         LOG.info("megabytes = " + megabytes);
         LOG.info("factor = " + factor);
         LOG.info("factor = " + factor);
         LOG.info("create = " + create);
         LOG.info("create = " + create);
+        LOG.info("seed = " + seed);
         LOG.info("rwonly = " + rwonly);
         LOG.info("rwonly = " + rwonly);
         LOG.info("check = " + check);
         LOG.info("check = " + check);
         LOG.info("fast = " + fast);
         LOG.info("fast = " + fast);
@@ -338,7 +356,6 @@ public class TestSequenceFile extends TestCase {
           System.exit(-1);
           System.exit(-1);
         }
         }
 
 
-        int seed = 0;
         CompressionType compressionType = 
         CompressionType compressionType = 
           CompressionType.valueOf(compressType);
           CompressionType.valueOf(compressType);