瀏覽代碼

HADOOP-2033. Make SequenceFile.Writer.sync actually write a sync block.
Contributed by omalley.


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@585648 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 17 年之前
父節點
當前提交
8a50b5e5c0

+ 4 - 0
CHANGES.txt

@@ -321,6 +321,10 @@ Branch 0.15 (unreleased changes)
     null was missing and hence NPE would be thrown sometimes. This issue fixes
     null was missing and hence NPE would be thrown sometimes. This issue fixes
     that problem.  (Amareshwari Sri Ramadasu via ddas) 
     that problem.  (Amareshwari Sri Ramadasu via ddas) 
 
 
+    HADOOP-2033.  The SequenceFile.Writer.sync method was a no-op, which caused
+    very uneven splits for applications like distcp that count on them.
+    (omalley)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HADOOP-1908. Restructure data node code so that block sending and 
     HADOOP-1908. Restructure data node code so that block sending and 

+ 6 - 1
src/java/org/apache/hadoop/io/MapFile.java

@@ -19,6 +19,9 @@
 package org.apache.hadoop.io;
 package org.apache.hadoop.io;
 
 
 import java.io.*;
 import java.io.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progressable;
@@ -44,6 +47,8 @@ import org.apache.hadoop.io.compress.DefaultCodec;
  * SequenceFile.Sorter}.
  * SequenceFile.Sorter}.
  */
  */
 public class MapFile {
 public class MapFile {
+  private static final Log LOG = LogFactory.getLog(MapFile.class);
+
   /** The name of the index file. */
   /** The name of the index file. */
   public static final String INDEX_FILE_NAME = "index";
   public static final String INDEX_FILE_NAME = "index";
 
 
@@ -302,7 +307,7 @@ public class MapFile {
           count++;
           count++;
         }
         }
       } catch (EOFException e) {
       } catch (EOFException e) {
-        SequenceFile.LOG.warn("Unexpected EOF reading " + index +
+        LOG.warn("Unexpected EOF reading " + index +
                               " at entry #" + count + ".  Ignoring.");
                               " at entry #" + count + ".  Ignoring.");
       } finally {
       } finally {
 	indexClosed = true;
 	indexClosed = true;

+ 38 - 39
src/java/org/apache/hadoop/io/SequenceFile.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.io;
 
 
 import java.io.*;
 import java.io.*;
 import java.util.*;
 import java.util.*;
-import java.net.InetAddress;
 import java.rmi.server.UID;
 import java.rmi.server.UID;
 import java.security.MessageDigest;
 import java.security.MessageDigest;
 import org.apache.commons.logging.*;
 import org.apache.commons.logging.*;
@@ -43,8 +42,7 @@ import org.apache.hadoop.util.PriorityQueue;
 
 
 /** Support for flat files of binary key/value pairs. */
 /** Support for flat files of binary key/value pairs. */
 public class SequenceFile {
 public class SequenceFile {
-  public static final Log LOG =
-    LogFactory.getLog("org.apache.hadoop.io.SequenceFile");
+  private static final Log LOG = LogFactory.getLog(SequenceFile.class);
 
 
   private SequenceFile() {}                         // no public ctor
   private SequenceFile() {}                         // no public ctor
 
 
@@ -757,6 +755,11 @@ public class SequenceFile {
     
     
     /** create a sync point */
     /** create a sync point */
     public void sync() throws IOException {
     public void sync() throws IOException {
+      if (sync != null && lastSyncPos != out.getPos()) {
+        out.writeInt(SYNC_ESCAPE);                // mark the start of the sync
+        out.write(sync);                          // write sync
+        lastSyncPos = out.getPos();               // update lastSyncPos
+      }
     }
     }
 
 
     /** Returns the configuration of this file. */
     /** Returns the configuration of this file. */
@@ -780,10 +783,7 @@ public class SequenceFile {
     synchronized void checkAndWriteSync() throws IOException {
     synchronized void checkAndWriteSync() throws IOException {
       if (sync != null &&
       if (sync != null &&
           out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
           out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
-        lastSyncPos = out.getPos();               // update lastSyncPos
-        //LOG.info("sync@"+lastSyncPos);
-        out.writeInt(SYNC_ESCAPE);                // escape it
-        out.write(sync);                          // write sync
+        sync();
       }
       }
     }
     }
 
 
@@ -955,10 +955,6 @@ public class SequenceFile {
       val.writeCompressedBytes(out);              // 'value' data
       val.writeCompressedBytes(out);              // 'value' data
     }
     }
     
     
-
-    public void sync() throws IOException {
-    }
-   
   } // RecordCompressionWriter
   } // RecordCompressionWriter
 
 
   /** Write compressed key/value blocks to a sequence-format file. */
   /** Write compressed key/value blocks to a sequence-format file. */
@@ -1045,13 +1041,9 @@ public class SequenceFile {
     }
     }
     
     
     /** Compress and flush contents to dfs */
     /** Compress and flush contents to dfs */
-    private synchronized void writeBlock() throws IOException {
+    public synchronized void sync() throws IOException {
       if (noBufferedRecords > 0) {
       if (noBufferedRecords > 0) {
-        // Write 'sync' marker
-        if (sync != null) {
-          out.writeInt(SYNC_ESCAPE);
-          out.write(sync);
-        }
+        super.sync();
         
         
         // No. of records
         // No. of records
         WritableUtils.writeVInt(out, noBufferedRecords);
         WritableUtils.writeVInt(out, noBufferedRecords);
@@ -1080,15 +1072,11 @@ public class SequenceFile {
     /** Close the file. */
     /** Close the file. */
     public synchronized void close() throws IOException {
     public synchronized void close() throws IOException {
       if (out != null) {
       if (out != null) {
-        writeBlock();
+        sync();
       }
       }
       super.close();
       super.close();
     }
     }
 
 
-    public void sync() throws IOException {
-      writeBlock();
-    }
-
     /** Append a key/value pair. */
     /** Append a key/value pair. */
     public synchronized void append(Writable key, Writable val)
     public synchronized void append(Writable key, Writable val)
       throws IOException {
       throws IOException {
@@ -1116,7 +1104,7 @@ public class SequenceFile {
       // Compress and flush?
       // Compress and flush?
       int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
       int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
       if (currentBlockSize >= compressionBlockSize) {
       if (currentBlockSize >= compressionBlockSize) {
-        writeBlock();
+        sync();
       }
       }
     }
     }
     
     
@@ -1144,7 +1132,7 @@ public class SequenceFile {
       // Compress and flush?
       // Compress and flush?
       int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
       int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
       if (currentBlockSize >= compressionBlockSize) {
       if (currentBlockSize >= compressionBlockSize) {
-        writeBlock();
+        sync();
       }
       }
     }
     }
   
   
@@ -1586,15 +1574,26 @@ public class SequenceFile {
       return more;
       return more;
     }
     }
     
     
-    private synchronized int checkAndReadSync(int length) 
-      throws IOException {
+    /**
+     * Read and return the next record length, potentially skipping over 
+     * a sync block.
+     * @return the length of the next record or -1 if there is no next record
+     * @throws IOException
+     */
+    private synchronized int readRecordLength() throws IOException {
+      if (in.getPos() >= end) {
+        return -1;
+      }      
+      int length = in.readInt();
       if (version > 1 && sync != null &&
       if (version > 1 && sync != null &&
           length == SYNC_ESCAPE) {              // process a sync entry
           length == SYNC_ESCAPE) {              // process a sync entry
-        //LOG.info("sync@"+in.getPos());
         in.readFully(syncCheck);                // read syncCheck
         in.readFully(syncCheck);                // read syncCheck
         if (!Arrays.equals(sync, syncCheck))    // check it
         if (!Arrays.equals(sync, syncCheck))    // check it
           throw new IOException("File is corrupt!");
           throw new IOException("File is corrupt!");
         syncSeen = true;
         syncSeen = true;
+        if (in.getPos() >= end) {
+          return -1;
+        }
         length = in.readInt();                  // re-read length
         length = in.readInt();                  // re-read length
       } else {
       } else {
         syncSeen = false;
         syncSeen = false;
@@ -1614,11 +1613,11 @@ public class SequenceFile {
         throw new IOException("Unsupported call for block-compressed" +
         throw new IOException("Unsupported call for block-compressed" +
                               " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
                               " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
       }
       }
-      if (in.getPos() >= end)
-        return -1;
-
       try {
       try {
-        int length = checkAndReadSync(in.readInt());
+        int length = readRecordLength();
+        if (length == -1) {
+          return -1;
+        }
         int keyLength = in.readInt();
         int keyLength = in.readInt();
         buffer.write(in, length);
         buffer.write(in, length);
         return keyLength;
         return keyLength;
@@ -1642,16 +1641,16 @@ public class SequenceFile {
      * Read 'raw' records.
      * Read 'raw' records.
      * @param key - The buffer into which the key is read
      * @param key - The buffer into which the key is read
      * @param val - The 'raw' value
      * @param val - The 'raw' value
-     * @return Returns the total record length
+     * @return Returns the total record length or -1 for end of file
      * @throws IOException
      * @throws IOException
      */
      */
     public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
     public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
       throws IOException {
       throws IOException {
       if (!blockCompressed) {
       if (!blockCompressed) {
-        if (in.getPos() >= end) 
+        int length = readRecordLength();
+        if (length == -1) {
           return -1;
           return -1;
-
-        int length = checkAndReadSync(in.readInt());
+        }
         int keyLength = in.readInt();
         int keyLength = in.readInt();
         int valLength = length - keyLength;
         int valLength = length - keyLength;
         key.write(in, keyLength);
         key.write(in, keyLength);
@@ -1701,16 +1700,16 @@ public class SequenceFile {
     /**
     /**
      * Read 'raw' keys.
      * Read 'raw' keys.
      * @param key - The buffer into which the key is read
      * @param key - The buffer into which the key is read
-     * @return Returns the key length
+     * @return Returns the key length or -1 for end of file
      * @throws IOException
      * @throws IOException
      */
      */
     public int nextRawKey(DataOutputBuffer key) 
     public int nextRawKey(DataOutputBuffer key) 
       throws IOException {
       throws IOException {
       if (!blockCompressed) {
       if (!blockCompressed) {
-        if (in.getPos() >= end) 
+        recordLength = readRecordLength();
+        if (recordLength == -1) {
           return -1;
           return -1;
-
-        recordLength = checkAndReadSync(in.readInt());
+        }
         keyLength = in.readInt();
         keyLength = in.readInt();
         key.write(in, keyLength);
         key.write(in, keyLength);
         return keyLength;
         return keyLength;

+ 1 - 1
src/test/org/apache/hadoop/io/TestArrayFile.java

@@ -28,7 +28,7 @@ import org.apache.hadoop.conf.*;
 
 
 /** Support for flat files of binary key/value pairs. */
 /** Support for flat files of binary key/value pairs. */
 public class TestArrayFile extends TestCase {
 public class TestArrayFile extends TestCase {
-  private static Log LOG = SequenceFile.LOG;
+  private static final Log LOG = LogFactory.getLog(TestArrayFile.class);
   private static String FILE =
   private static String FILE =
     System.getProperty("test.build.data",".") + "/test.array";
     System.getProperty("test.build.data",".") + "/test.array";
 
 

+ 1 - 1
src/test/org/apache/hadoop/io/TestSequenceFile.java

@@ -35,7 +35,7 @@ import org.apache.hadoop.conf.*;
 
 
 /** Support for flat files of binary key/value pairs. */
 /** Support for flat files of binary key/value pairs. */
 public class TestSequenceFile extends TestCase {
 public class TestSequenceFile extends TestCase {
-  private static Log LOG = SequenceFile.LOG;
+  private static final Log LOG = LogFactory.getLog(TestSequenceFile.class);
 
 
   private static Configuration conf = new Configuration();
   private static Configuration conf = new Configuration();
   
   

+ 1 - 1
src/test/org/apache/hadoop/io/TestSetFile.java

@@ -30,7 +30,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType;
 
 
 /** Support for flat files of binary key/value pairs. */
 /** Support for flat files of binary key/value pairs. */
 public class TestSetFile extends TestCase {
 public class TestSetFile extends TestCase {
-  private static Log LOG = SequenceFile.LOG;
+  private static final Log LOG = LogFactory.getLog(TestSetFile.class);
   private static String FILE =
   private static String FILE =
     System.getProperty("test.build.data",".") + "/test.set";
     System.getProperty("test.build.data",".") + "/test.set";