瀏覽代碼

HADOOP-1398. Add HBase in-memory block cache. Contributed by Tom White.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@618347 13f79535-47bb-0310-9956-ffa450edef68
Thomas White 17 年之前
父節點
當前提交
aeadbeff02
共有 23 個文件被更改,包括 825 次插入233 次删除
  1. 2 0
      CHANGES.txt
  2. 6 0
      src/contrib/hbase/conf/hbase-default.xml
  3. 39 1
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/HColumnDescriptor.java
  4. 18 5
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
  5. 88 2
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java
  6. 4 4
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java
  7. 5 1
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/hql/AlterCommand.java
  8. 6 0
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/hql/HQLParser.jj
  9. 6 1
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/hql/SchemaModificationCommand.java
  10. 23 18
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/hql/generated/HQLParser.java
  11. 17 15
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/hql/generated/HQLParserConstants.java
  12. 205 173
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/hql/generated/HQLParserTokenManager.java
  13. 206 0
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BlockFSInputStream.java
  14. 2 1
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/thrift/Hbase.thrift
  15. 3 1
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java
  16. 24 1
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/thrift/generated/ColumnDescriptor.java
  17. 3 3
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
  18. 2 0
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBloomFilters.java
  19. 1 1
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java
  20. 5 4
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestToString.java
  21. 121 0
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/io/TestBlockFSInputStream.java
  22. 29 1
      src/java/org/apache/hadoop/io/MapFile.java
  23. 10 1
      src/java/org/apache/hadoop/io/SequenceFile.java

+ 2 - 0
CHANGES.txt

@@ -7,6 +7,8 @@ Trunk (unreleased changes)
 
   NEW FEATURES
 
+    HADOOP-1398.  Add HBase in-memory block cache.  (tomwhite)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 6 - 0
src/contrib/hbase/conf/hbase-default.xml

@@ -221,6 +221,12 @@
     such as hlog.
     </description>
   </property>
+  <property>
+    <name>hbase.hstore.blockCache.blockSize</name>
+    <value>65536</value>
+    <description>The size of each block in any block caches.
+    </description>
+  </property>
 
   <!-- HbaseShell Configurations -->
   <property>

+ 39 - 1
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HColumnDescriptor.java

@@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.io.TextSequence;
 public class HColumnDescriptor implements WritableComparable {
   
   // For future backward compatibility
-  private static final byte COLUMN_DESCRIPTOR_VERSION = (byte)1;
+  private static final byte COLUMN_DESCRIPTOR_VERSION = (byte)2;
   
   /** Legal family names can only contain 'word characters' and end in a colon. */
   public static final Pattern LEGAL_FAMILY_NAME = Pattern.compile("\\w+:");
@@ -76,6 +76,11 @@ public class HColumnDescriptor implements WritableComparable {
    */
   public static final boolean DEFAULT_IN_MEMORY = false;
   
+  /**
+   * Default setting for whether to use a block cache or not.
+   */
+  public static final boolean DEFAULT_BLOCK_CACHE_ENABLED = false;
+  
   /**
    * Default maximum length of cell contents.
    */
@@ -95,6 +100,8 @@ public class HColumnDescriptor implements WritableComparable {
   private CompressionType compressionType;
   // Serve reads from in-memory cache
   private boolean inMemory;
+  // Serve reads from in-memory block cache
+  private boolean blockCacheEnabled;
   // Maximum value size
   private int maxValueLength;
   // True if bloom filter was specified
@@ -123,6 +130,7 @@ public class HColumnDescriptor implements WritableComparable {
     this(columnName == null || columnName.length() <= 0?
       new Text(): new Text(columnName),
       DEFAULT_N_VERSIONS, DEFAULT_COMPRESSION_TYPE, DEFAULT_IN_MEMORY,
+      DEFAULT_BLOCK_CACHE_ENABLED, 
       Integer.MAX_VALUE, DEFAULT_BLOOM_FILTER_DESCRIPTOR);
   }
   
@@ -134,6 +142,7 @@ public class HColumnDescriptor implements WritableComparable {
    * @param compression Compression type
    * @param inMemory If true, column data should be kept in an HRegionServer's
    * cache
+   * @param blockCacheEnabled If true, MapFile blocks should be cached
    * @param maxValueLength Restrict values to &lt;= this value
    * @param bloomFilter Enable the specified bloom filter for this column
    * 
@@ -144,6 +153,7 @@ public class HColumnDescriptor implements WritableComparable {
    */
   public HColumnDescriptor(final Text name, final int maxVersions,
       final CompressionType compression, final boolean inMemory,
+      final boolean blockCacheEnabled,
       final int maxValueLength, final BloomFilterDescriptor bloomFilter) {
     String familyStr = name.toString();
     // Test name if not null (It can be null when deserializing after
@@ -165,6 +175,7 @@ public class HColumnDescriptor implements WritableComparable {
     }
     this.maxVersions = maxVersions;
     this.inMemory = inMemory;
+    this.blockCacheEnabled = blockCacheEnabled;
     this.maxValueLength = maxValueLength;
     this.bloomFilter = bloomFilter;
     this.bloomFilterSpecified = this.bloomFilter == null ? false : true;
@@ -211,6 +222,13 @@ public class HColumnDescriptor implements WritableComparable {
   public boolean isInMemory() {
     return this.inMemory;
   }
+  
+  /**
+   * @return True if MapFile blocks should be cached.
+   */
+  public boolean isBlockCacheEnabled() {
+    return blockCacheEnabled;
+  }
 
   /**
    * @return Maximum value length.
@@ -234,6 +252,7 @@ public class HColumnDescriptor implements WritableComparable {
     return "{name: " + tmp.substring(0, tmp.length() - 1) +
       ", max versions: " + maxVersions +
       ", compression: " + this.compressionType + ", in memory: " + inMemory +
+      ", block cache enabled: " + blockCacheEnabled +
       ", max length: " + maxValueLength + ", bloom filter: " +
       (bloomFilterSpecified ? bloomFilter.toString() : "none") + "}";
   }
@@ -251,6 +270,7 @@ public class HColumnDescriptor implements WritableComparable {
     result ^= Integer.valueOf(this.maxVersions).hashCode();
     result ^= this.compressionType.hashCode();
     result ^= Boolean.valueOf(this.inMemory).hashCode();
+    result ^= Boolean.valueOf(this.blockCacheEnabled).hashCode();
     result ^= Integer.valueOf(this.maxValueLength).hashCode();
     result ^= Boolean.valueOf(this.bloomFilterSpecified).hashCode();
     result ^= Byte.valueOf(this.versionNumber).hashCode();
@@ -277,6 +297,10 @@ public class HColumnDescriptor implements WritableComparable {
       bloomFilter = new BloomFilterDescriptor();
       bloomFilter.readFields(in);
     }
+    
+    if (this.versionNumber > 1) {
+      this.blockCacheEnabled = in.readBoolean();
+    }
   }
 
   /** {@inheritDoc} */
@@ -292,6 +316,8 @@ public class HColumnDescriptor implements WritableComparable {
     if(bloomFilterSpecified) {
       bloomFilter.write(out);
     }
+
+    out.writeBoolean(this.blockCacheEnabled);
   }
 
   // Comparable
@@ -327,6 +353,18 @@ public class HColumnDescriptor implements WritableComparable {
       }
     }
     
+    if(result == 0) {
+      if(this.blockCacheEnabled == other.blockCacheEnabled) {
+        result = 0;
+        
+      } else if(this.blockCacheEnabled) {
+        result = -1;
+        
+      } else {
+        result = 1;
+      }
+    }
+    
     if(result == 0) {
       result = other.maxValueLength - this.maxValueLength;
     }

+ 18 - 5
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java

@@ -739,11 +739,21 @@ public class HStore implements HConstants {
     // Move maxSeqId on by one. Why here?  And not in HRegion?
     this.maxSeqId += 1;
     
-    // Finally, start up all the map readers! (There should be just one at this 
-    // point, as we've compacted them all.)
+    // Finally, start up all the map readers! (There could be more than one
+    // since we haven't compacted yet.)
+    boolean first = true;
     for(Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
-      this.readers.put(e.getKey(),
-        e.getValue().getReader(this.fs, this.bloomFilter));
+      if (first) {
+        // Use a block cache (if configured) for the first reader only
+        // so as to control memory usage.
+        this.readers.put(e.getKey(),
+            e.getValue().getReader(this.fs, this.bloomFilter,
+                family.isBlockCacheEnabled()));
+        first = false;
+      } else {
+        this.readers.put(e.getKey(),
+          e.getValue().getReader(this.fs, this.bloomFilter));
+      }
     }
   }
   
@@ -1560,7 +1570,10 @@ public class HStore implements HConstants {
           // 6. Loading the new TreeMap.
           Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
           this.readers.put(orderVal,
-            finalCompactedFile.getReader(this.fs, this.bloomFilter));
+            // Use a block cache (if configured) for this reader since
+            // it is the only one.
+            finalCompactedFile.getReader(this.fs, this.bloomFilter,
+                family.isBlockCacheEnabled()));
           this.storefiles.put(orderVal, finalCompactedFile);
         } catch (IOException e) {
           e = RemoteExceptionHandler.checkIOException(e);

+ 88 - 2
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java

@@ -31,9 +31,11 @@ import java.util.Random;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.BlockFSInputStream;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.MapFile;
@@ -422,6 +424,29 @@ public class HStoreFile implements HConstants {
       new BloomFilterMapFile.Reader(fs, getMapFilePath().toString(),
         conf, bloomFilter);
   }
+  
+  /**
+   * Get reader for the store file map file.
+   * Client is responsible for closing file when done.
+   * @param fs
+   * @param bloomFilter If null, no filtering is done.
+   * @param blockCacheEnabled If true, MapFile blocks should be cached.
+   * @return MapFile.Reader
+   * @throws IOException
+   */
+  public synchronized MapFile.Reader getReader(final FileSystem fs,
+      final Filter bloomFilter, final boolean blockCacheEnabled)
+  throws IOException {
+    
+    if (isReference()) {
+      return new HStoreFile.HalfMapFileReader(fs,
+          getMapFilePath(reference).toString(), conf, 
+          reference.getFileRegion(), reference.getMidkey(), bloomFilter,
+          blockCacheEnabled);
+    }
+    return new BloomFilterMapFile.Reader(fs, getMapFilePath().toString(),
+        conf, bloomFilter, blockCacheEnabled);
+  }
 
   /**
    * Get a store file writer.
@@ -581,7 +606,13 @@ public class HStoreFile implements HConstants {
    */
   static class HbaseMapFile extends MapFile {
 
+    /**
+     * A reader capable of reading and caching blocks of the data file.
+     */
     static class HbaseReader extends MapFile.Reader {
+      
+      private final boolean blockCacheEnabled;
+      
       /**
        * @param fs
        * @param dirName
@@ -590,7 +621,23 @@ public class HStoreFile implements HConstants {
        */
       public HbaseReader(FileSystem fs, String dirName, Configuration conf)
       throws IOException {
-        super(fs, dirName, conf);
+        this(fs, dirName, conf, false);
+      }
+      
+      /**
+       * @param fs
+       * @param dirName
+       * @param conf
+       * @param blockCacheEnabled
+       * @throws IOException
+       */
+      public HbaseReader(FileSystem fs, String dirName, Configuration conf,
+          boolean blockCacheEnabled)
+      throws IOException {
+        super(fs, dirName, null, conf, false); // defer opening streams
+        this.blockCacheEnabled = blockCacheEnabled;
+        open(fs, dirName, null, conf);
+        
         // Force reading of the mapfile index by calling midKey.
         // Reading the index will bring the index into memory over
         // here on the client and then close the index file freeing
@@ -601,6 +648,28 @@ public class HStoreFile implements HConstants {
         // using up datanode resources.  See HADOOP-2341.
         midKey();
       }
+
+      @Override
+      protected org.apache.hadoop.io.SequenceFile.Reader createDataFileReader(
+          FileSystem fs, Path dataFile, Configuration conf)
+      throws IOException {
+        if (!blockCacheEnabled) {
+          return super.createDataFileReader(fs, dataFile, conf);
+        }
+        LOG.info("Block Cache enabled");
+        final int blockSize = conf.getInt("hbase.hstore.blockCache.blockSize",
+            64 * 1024);
+        return new SequenceFile.Reader(fs, dataFile,  conf) {
+          @Override
+          protected FSDataInputStream openFile(FileSystem fs, Path file,
+              int bufferSize, long length) throws IOException {
+            
+            return new FSDataInputStream(new BlockFSInputStream(
+                    super.openFile(fs, file, bufferSize, length), length,
+                    blockSize));
+          }
+        };
+      }
     }
     
     static class HbaseWriter extends MapFile.Writer {
@@ -648,6 +717,13 @@ public class HStoreFile implements HConstants {
         super(fs, dirName, conf);
         bloomFilter = filter;
       }
+
+      public Reader(FileSystem fs, String dirName, Configuration conf,
+          final Filter filter, final boolean blockCacheEnabled)
+      throws IOException {
+        super(fs, dirName, conf, blockCacheEnabled);
+        bloomFilter = filter;
+      }
       
       /** {@inheritDoc} */
       @Override
@@ -741,7 +817,7 @@ public class HStoreFile implements HConstants {
         final Configuration conf, final Range r,
         final WritableComparable midKey)
     throws IOException {
-      this(fs, dirName, conf, r, midKey, null);
+      this(fs, dirName, conf, r, midKey, null, false);
     }
     
     HalfMapFileReader(final FileSystem fs, final String dirName, 
@@ -753,6 +829,16 @@ public class HStoreFile implements HConstants {
       midkey = midKey;
     }
     
+    HalfMapFileReader(final FileSystem fs, final String dirName, 
+        final Configuration conf, final Range r,
+        final WritableComparable midKey, final Filter filter,
+        final boolean blockCacheEnabled)
+    throws IOException {
+      super(fs, dirName, conf, filter, blockCacheEnabled);
+      top = isTopFileRegion(r);
+      midkey = midKey;
+    }
+    
     @SuppressWarnings("unchecked")
     private void checkKey(final WritableComparable key)
     throws IOException {

+ 4 - 4
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java

@@ -43,15 +43,15 @@ public class HTableDescriptor implements WritableComparable {
   public static final HTableDescriptor rootTableDesc =
     new HTableDescriptor(HConstants.ROOT_TABLE_NAME,
         new HColumnDescriptor(HConstants.COLUMN_FAMILY, 1,
-            HColumnDescriptor.CompressionType.NONE, false, Integer.MAX_VALUE,
-            null));
+            HColumnDescriptor.CompressionType.NONE, false, false,
+            Integer.MAX_VALUE, null));
   
   /** table descriptor for meta table */
   public static final HTableDescriptor metaTableDesc =
     new HTableDescriptor(HConstants.META_TABLE_NAME,
         new HColumnDescriptor(HConstants.COLUMN_FAMILY, 1,
-            HColumnDescriptor.CompressionType.NONE, false, Integer.MAX_VALUE,
-            null));
+            HColumnDescriptor.CompressionType.NONE, false, false,
+            Integer.MAX_VALUE, null));
   
   private boolean rootregion;
   private boolean metaregion;

+ 5 - 1
src/contrib/hbase/src/java/org/apache/hadoop/hbase/hql/AlterCommand.java

@@ -202,6 +202,8 @@ public class AlterCommand extends SchemaModificationCommand {
             .get(spec)).toUpperCase());
       } else if (spec.equals("IN_MEMORY")) {
         inMemory = (Boolean) columnSpec.get(spec);
+      } else if (spec.equals("BLOCK_CACHE_ENABLED")) {
+        blockCacheEnabled = (Boolean) columnSpec.get(spec);
       } else if (spec.equals("BLOOMFILTER")) {
         bloomFilterType = BloomFilterType.valueOf(((String) columnSpec.get(spec))
             .toUpperCase());
@@ -229,7 +231,8 @@ public class AlterCommand extends SchemaModificationCommand {
     column = appendDelimiter(column);
 
     HColumnDescriptor columnDesc = new HColumnDescriptor(new Text(column),
-        maxVersions, compression, inMemory, maxLength, bloomFilterDesc);
+        maxVersions, compression, inMemory, blockCacheEnabled,
+        maxLength, bloomFilterDesc);
 
     return columnDesc;
   }
@@ -243,6 +246,7 @@ public class AlterCommand extends SchemaModificationCommand {
     maxLength = original.getMaxValueLength();
     compression = original.getCompression();
     inMemory = original.isInMemory();
+    blockCacheEnabled = original.isBlockCacheEnabled();
     bloomFilterDesc = original.getBloomFilter();
   }
 }

+ 6 - 0
src/contrib/hbase/src/java/org/apache/hadoop/hbase/hql/HQLParser.jj

@@ -118,6 +118,7 @@ TOKEN: /** for HQL statements */
    | <BLOCK: "block">
    | <RECORD: "record">
    | <IN_MEMORY: "in_memory">
+   | <BLOCK_CACHE_ENABLED: "block_cache_enabled">
    | <BLOOMFILTER: "bloomfilter">
    | <COUNTING_BLOOMFILTER: "counting_bloomfilter">
    | <RETOUCHED_BLOOMFILTER: "retouched_bloomfilter">
@@ -353,6 +354,11 @@ Map<String, Object> ColumnSpec() :
     { 
       columnSpec.put("IN_MEMORY", true); 
     } 
+   |  
+    <BLOCK_CACHE_ENABLED> 
+    { 
+      columnSpec.put("BLOCK_CACHE_ENABLED", true); 
+    } 
    |  
     <BLOOMFILTER>
     <EQUALS> 

+ 6 - 1
src/contrib/hbase/src/java/org/apache/hadoop/hbase/hql/SchemaModificationCommand.java

@@ -37,6 +37,7 @@ public abstract class SchemaModificationCommand extends BasicCommand {
   protected int maxLength;
   protected HColumnDescriptor.CompressionType compression;
   protected boolean inMemory;
+  protected boolean blockCacheEnabled;
   protected BloomFilterDescriptor bloomFilterDesc;
   protected BloomFilterType bloomFilterType;
   protected int vectorSize;
@@ -52,6 +53,7 @@ public abstract class SchemaModificationCommand extends BasicCommand {
     maxLength = HColumnDescriptor.DEFAULT_MAX_VALUE_LENGTH;
     compression = HColumnDescriptor.DEFAULT_COMPRESSION_TYPE;
     inMemory = HColumnDescriptor.DEFAULT_IN_MEMORY;
+    blockCacheEnabled = HColumnDescriptor.DEFAULT_BLOCK_CACHE_ENABLED;
     bloomFilterDesc = HColumnDescriptor.DEFAULT_BLOOM_FILTER_DESCRIPTOR;
   }
 
@@ -76,6 +78,8 @@ public abstract class SchemaModificationCommand extends BasicCommand {
             .valueOf(((String) columnSpec.get(spec)).toUpperCase());
       } else if (spec.equals("IN_MEMORY")) {
         inMemory = (Boolean) columnSpec.get(spec);
+      } else if (spec.equals("BLOCK_CACHE_ENABLED")) {
+        blockCacheEnabled = (Boolean) columnSpec.get(spec);
       } else if (spec.equals("BLOOMFILTER")) {
         bloomFilterType = BloomFilterType.valueOf(((String) columnSpec.get(spec))
             .toUpperCase());
@@ -103,7 +107,8 @@ public abstract class SchemaModificationCommand extends BasicCommand {
     column = appendDelimiter(column);
 
     HColumnDescriptor columnDesc = new HColumnDescriptor(new Text(column),
-        maxVersions, compression, inMemory, maxLength, bloomFilterDesc);
+        maxVersions, compression, inMemory, blockCacheEnabled,
+        maxLength, bloomFilterDesc);
 
     return columnDesc;
   }

+ 23 - 18
src/contrib/hbase/src/java/org/apache/hadoop/hbase/hql/generated/HQLParser.java

@@ -75,7 +75,7 @@ public class HQLParser implements HQLParserConstants {
     case SELECT:
     case ENABLE:
     case DISABLE:
-    case 68:
+    case 69:
       switch ((jj_ntk==-1)?jj_ntk():jj_ntk) {
       case HELP:
       case ALTER:
@@ -100,7 +100,7 @@ public class HQLParser implements HQLParserConstants {
         jj_la1[0] = jj_gen;
         ;
       }
-      jj_consume_token(68);
+      jj_consume_token(69);
       break;
     case 0:
       jj_consume_token(0);
@@ -390,6 +390,7 @@ public class HQLParser implements HQLParserConstants {
       case MAX_LENGTH:
       case COMPRESSION:
       case IN_MEMORY:
+      case BLOCK_CACHE_ENABLED:
       case BLOOMFILTER:
       case VECTOR_SIZE:
       case NUM_HASH:
@@ -440,6 +441,10 @@ public class HQLParser implements HQLParserConstants {
         jj_consume_token(IN_MEMORY);
       columnSpec.put("IN_MEMORY", true);
         break;
+      case BLOCK_CACHE_ENABLED:
+        jj_consume_token(BLOCK_CACHE_ENABLED);
+      columnSpec.put("BLOCK_CACHE_ENABLED", true);
+        break;
       case BLOOMFILTER:
         jj_consume_token(BLOOMFILTER);
         jj_consume_token(EQUALS);
@@ -1080,33 +1085,33 @@ public class HQLParser implements HQLParserConstants {
     finally { jj_save(0, xla); }
   }
 
-  final private boolean jj_3R_11() {
-    if (jj_scan_token(ID)) return true;
+  final private boolean jj_3_1() {
+    if (jj_scan_token(ADD)) return true;
+    if (jj_3R_10()) return true;
     return false;
   }
 
-  final private boolean jj_3R_10() {
+  final private boolean jj_3R_12() {
     Token xsp;
     xsp = jj_scanpos;
-    if (jj_3R_11()) {
+    if (jj_scan_token(67)) {
     jj_scanpos = xsp;
-    if (jj_3R_12()) return true;
+    if (jj_scan_token(68)) return true;
     }
     return false;
   }
 
-  final private boolean jj_3_1() {
-    if (jj_scan_token(ADD)) return true;
-    if (jj_3R_10()) return true;
+  final private boolean jj_3R_11() {
+    if (jj_scan_token(ID)) return true;
     return false;
   }
 
-  final private boolean jj_3R_12() {
+  final private boolean jj_3R_10() {
     Token xsp;
     xsp = jj_scanpos;
-    if (jj_scan_token(66)) {
+    if (jj_3R_11()) {
     jj_scanpos = xsp;
-    if (jj_scan_token(67)) return true;
+    if (jj_3R_12()) return true;
     }
     return false;
   }
@@ -1133,10 +1138,10 @@ public class HQLParser implements HQLParserConstants {
       jj_la1_0 = new int[] {0xf3ffe0,0xf3ffe1,0xf3ffe0,0x0,0x0,0x0,0x0,0x33dbc0,0x33dbc0,0x0,0x600,0x0,0x0,0x0,0x0,0x0,0x0,0x1000,0x0,0x80000000,0x0,0x2000000,0x0,0x3000000,0x8000000,0x3000000,0x80000000,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,};
    }
    private static void jj_la1_1() {
-      jj_la1_1 = new int[] {0x0,0x0,0x0,0x40000000,0xc0000000,0xc0000000,0x40000000,0x40000000,0x40000000,0x40000000,0x0,0x731c000,0xe0000,0xe00000,0x731c000,0x10,0x10,0x18000000,0x0,0x0,0x0,0x0,0xe0002000,0x0,0x0,0x0,0x0,0x1,0x2,0x10,0x0,0xc0002000,0xc0002000,0xc0002000,0x0,0xc0002000,0x10,0x10,0x10,0xc0000000,0x0,0x40000000,};
+      jj_la1_1 = new int[] {0x0,0x0,0x0,0x80000000,0x80000000,0x80000000,0x80000000,0x80000000,0x80000000,0x80000000,0x0,0xe71c000,0xe0000,0x1c00000,0xe71c000,0x10,0x10,0x30000000,0x0,0x0,0x0,0x0,0xc0002000,0x0,0x0,0x0,0x0,0x1,0x2,0x10,0x0,0x80002000,0x80002000,0x80002000,0x0,0x80002000,0x10,0x10,0x10,0x80000000,0x0,0x80000000,};
    }
    private static void jj_la1_2() {
-      jj_la1_2 = new int[] {0x0,0x10,0x0,0x0,0x1,0x1,0xc,0x0,0x0,0xc,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0xc,0x0,0xc,0x0,0xc,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0xc,0x0,0xc,0x0,0xc,0xc,0x0,0x0,0x0,0x0,0xc,0xc,};
+      jj_la1_2 = new int[] {0x0,0x20,0x0,0x0,0x3,0x3,0x18,0x0,0x0,0x18,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x18,0x0,0x18,0x0,0x19,0x0,0x0,0x0,0x0,0x0,0x0,0x0,0x18,0x1,0x19,0x1,0x18,0x19,0x0,0x0,0x0,0x1,0x18,0x18,};
    }
   final private JJCalls[] jj_2_rtns = new JJCalls[1];
   private boolean jj_rescan = false;
@@ -1313,8 +1318,8 @@ public class HQLParser implements HQLParserConstants {
 
   public ParseException generateParseException() {
     jj_expentries.removeAllElements();
-    boolean[] la1tokens = new boolean[69];
-    for (int i = 0; i < 69; i++) {
+    boolean[] la1tokens = new boolean[70];
+    for (int i = 0; i < 70; i++) {
       la1tokens[i] = false;
     }
     if (jj_kind >= 0) {
@@ -1336,7 +1341,7 @@ public class HQLParser implements HQLParserConstants {
         }
       }
     }
-    for (int i = 0; i < 69; i++) {
+    for (int i = 0; i < 70; i++) {
       if (la1tokens[i]) {
         jj_expentry = new int[1];
         jj_expentry[0] = i;

+ 17 - 15
src/contrib/hbase/src/java/org/apache/hadoop/hbase/hql/generated/HQLParserConstants.java

@@ -52,21 +52,22 @@ public interface HQLParserConstants {
   int BLOCK = 50;
   int RECORD = 51;
   int IN_MEMORY = 52;
-  int BLOOMFILTER = 53;
-  int COUNTING_BLOOMFILTER = 54;
-  int RETOUCHED_BLOOMFILTER = 55;
-  int VECTOR_SIZE = 56;
-  int NUM_HASH = 57;
-  int NUM_ENTRIES = 58;
-  int ADD = 59;
-  int CHANGE = 60;
-  int COUNT = 61;
-  int ID = 62;
-  int INTEGER_LITERAL = 63;
-  int FLOATING_POINT_LITERAL = 64;
-  int EXPONENT = 65;
-  int QUOTED_IDENTIFIER = 66;
-  int STRING_LITERAL = 67;
+  int BLOCK_CACHE_ENABLED = 53;
+  int BLOOMFILTER = 54;
+  int COUNTING_BLOOMFILTER = 55;
+  int RETOUCHED_BLOOMFILTER = 56;
+  int VECTOR_SIZE = 57;
+  int NUM_HASH = 58;
+  int NUM_ENTRIES = 59;
+  int ADD = 60;
+  int CHANGE = 61;
+  int COUNT = 62;
+  int ID = 63;
+  int INTEGER_LITERAL = 64;
+  int FLOATING_POINT_LITERAL = 65;
+  int EXPONENT = 66;
+  int QUOTED_IDENTIFIER = 67;
+  int STRING_LITERAL = 68;
 
   int DEFAULT = 0;
 
@@ -124,6 +125,7 @@ public interface HQLParserConstants {
     "\"block\"",
     "\"record\"",
     "\"in_memory\"",
+    "\"block_cache_enabled\"",
     "\"bloomfilter\"",
     "\"counting_bloomfilter\"",
     "\"retouched_bloomfilter\"",

文件差異過大導致無法顯示
+ 205 - 173
src/contrib/hbase/src/java/org/apache/hadoop/hbase/hql/generated/HQLParserTokenManager.java


+ 206 - 0
src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BlockFSInputStream.java

@@ -0,0 +1,206 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.commons.collections.map.ReferenceMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.DataInputBuffer;
+
+/**
+ * An implementation of {@link FSInputStream} that reads the stream in blocks
+ * of a fixed, configurable size. The blocks are stored in a memory-sensitive cache.
+ */
+public class BlockFSInputStream extends FSInputStream {
+  
+  static final Log LOG = LogFactory.getLog(BlockFSInputStream.class);
+  
+  private final InputStream in;
+
+  private final long fileLength;
+
+  private final int blockSize;
+  private final Map<Long, byte[]> blocks;
+
+  private boolean closed;
+
+  private DataInputBuffer blockStream = new DataInputBuffer();
+
+  private long blockEnd = -1;
+
+  private long pos = 0;
+
+  /**
+   * @param in
+   * @param fileLength
+   * @param blockSize the size of each block in bytes.
+   */
+  @SuppressWarnings("unchecked")
+  public BlockFSInputStream(InputStream in, long fileLength, int blockSize) {
+    this.in = in;
+    if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
+      throw new IllegalArgumentException(
+          "In is not an instance of Seekable or PositionedReadable");
+    }
+    this.fileLength = fileLength;
+    this.blockSize = blockSize;
+    // a memory-sensitive map that has soft references to values
+    this.blocks = new ReferenceMap() {
+      private long hits, misses;
+      @Override
+      public Object get(Object key) {
+        Object value = super.get(key);
+        if (value == null) {
+          misses++;
+        } else {
+          hits++;
+        }
+        if (LOG.isDebugEnabled() && ((hits + misses) % 10000) == 0) {
+          long hitRate = (100 * hits) / (hits + misses);
+          LOG.info("Hit rate for cache " + hashCode() + ": " + hitRate + "%");
+        }
+        return value;
+      }
+    };
+  }
+
+  @Override
+  public synchronized long getPos() throws IOException {
+    return pos;
+  }
+
+  @Override
+  public synchronized int available() throws IOException {
+    return (int) (fileLength - pos);
+  }
+
+  @Override
+  public synchronized void seek(long targetPos) throws IOException {
+    if (targetPos > fileLength) {
+      throw new IOException("Cannot seek after EOF");
+    }
+    pos = targetPos;
+    blockEnd = -1;
+  }
+
+  @Override
+  public synchronized boolean seekToNewSource(long targetPos)
+      throws IOException {
+    return false;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    int result = -1;
+    if (pos < fileLength) {
+      if (pos > blockEnd) {
+        blockSeekTo(pos);
+      }
+      result = blockStream.read();
+      if (result >= 0) {
+        pos++;
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public synchronized int read(byte buf[], int off, int len) throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    if (pos < fileLength) {
+      if (pos > blockEnd) {
+        blockSeekTo(pos);
+      }
+      int realLen = Math.min(len, (int) (blockEnd - pos + 1));
+      int result = blockStream.read(buf, off, realLen);
+      if (result >= 0) {
+        pos += result;
+      }
+      return result;
+    }
+    return -1;
+  }
+
+  private synchronized void blockSeekTo(long target) throws IOException {
+    int targetBlock = (int) (target / blockSize);
+    long targetBlockStart = targetBlock * blockSize;
+    long targetBlockEnd = Math.min(targetBlockStart + blockSize, fileLength) - 1;
+    long blockLength = targetBlockEnd - targetBlockStart + 1;
+    long offsetIntoBlock = target - targetBlockStart;
+
+    byte[] block = blocks.get(targetBlockStart);
+    if (block == null) {
+      block = new byte[blockSize];
+      ((PositionedReadable) in).readFully(targetBlockStart, block, 0,
+          (int) blockLength);
+      blocks.put(targetBlockStart, block);
+    }
+    
+    this.pos = target;
+    this.blockEnd = targetBlockEnd;
+    this.blockStream.reset(block, (int) offsetIntoBlock,
+        (int) (blockLength - offsetIntoBlock));
+
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    if (blockStream != null) {
+      blockStream.close();
+      blockStream = null;
+    }
+    super.close();
+    closed = true;
+  }
+
+  /**
+   * We don't support marks.
+   */
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+
+  @Override
+  public void mark(int readLimit) {
+    // Do nothing
+  }
+
+  @Override
+  public void reset() throws IOException {
+    throw new IOException("Mark not supported");
+  }
+
+}

+ 2 - 1
src/contrib/hbase/src/java/org/apache/hadoop/hbase/thrift/Hbase.thrift

@@ -64,7 +64,8 @@ struct ColumnDescriptor {
   5:i32 maxValueLength = 2147483647,
   6:string bloomFilterType = "NONE",
   7:i32 bloomFilterVectorSize = 0,
-  8:i32 bloomFilterNbHashes = 0
+  8:i32 bloomFilterNbHashes = 0,
+  9:bool blockCacheEnabled = 0
 }
 
 /**

+ 3 - 1
src/contrib/hbase/src/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java

@@ -58,7 +58,8 @@ public class ThriftUtilities {
       throw new IllegalArgument("column name is empty");
     }
     HColumnDescriptor col = new HColumnDescriptor(new Text(in.name),
-        in.maxVersions, comp, in.inMemory, in.maxValueLength, bloom);
+        in.maxVersions, comp, in.inMemory, in.blockCacheEnabled,
+        in.maxValueLength, bloom);
     return col;
   }
   
@@ -76,6 +77,7 @@ public class ThriftUtilities {
     col.maxVersions = in.getMaxVersions();
     col.compression = in.getCompression().toString();
     col.inMemory = in.isInMemory();
+    col.blockCacheEnabled = in.isBlockCacheEnabled();
     col.maxValueLength = in.getMaxValueLength();
     BloomFilterDescriptor bloom = in.getBloomFilter();
     if (bloom != null) {

+ 24 - 1
src/contrib/hbase/src/java/org/apache/hadoop/hbase/thrift/generated/ColumnDescriptor.java

@@ -46,6 +46,7 @@ public class ColumnDescriptor implements TBase, java.io.Serializable {
   public String bloomFilterType;
   public int bloomFilterVectorSize;
   public int bloomFilterNbHashes;
+  public boolean blockCacheEnabled;
 
   public final Isset __isset = new Isset();
   public static final class Isset {
@@ -57,6 +58,7 @@ public class ColumnDescriptor implements TBase, java.io.Serializable {
     public boolean bloomFilterType = false;
     public boolean bloomFilterVectorSize = false;
     public boolean bloomFilterNbHashes = false;
+    public boolean blockCacheEnabled = false;
   }
 
   public ColumnDescriptor() {
@@ -74,6 +76,8 @@ public class ColumnDescriptor implements TBase, java.io.Serializable {
 
     this.bloomFilterNbHashes = 0;
 
+    this.blockCacheEnabled = false;
+
   }
 
   public ColumnDescriptor(
@@ -84,7 +88,8 @@ public class ColumnDescriptor implements TBase, java.io.Serializable {
     int maxValueLength,
     String bloomFilterType,
     int bloomFilterVectorSize,
-    int bloomFilterNbHashes)
+    int bloomFilterNbHashes,
+    boolean blockCacheEnabled)
   {
     this();
     this.name = name;
@@ -103,6 +108,8 @@ public class ColumnDescriptor implements TBase, java.io.Serializable {
     this.__isset.bloomFilterVectorSize = true;
     this.bloomFilterNbHashes = bloomFilterNbHashes;
     this.__isset.bloomFilterNbHashes = true;
+    this.blockCacheEnabled = blockCacheEnabled;
+    this.__isset.blockCacheEnabled = true;
   }
 
   public void read(TProtocol iprot) throws TException {
@@ -180,6 +187,14 @@ public class ColumnDescriptor implements TBase, java.io.Serializable {
             TProtocolUtil.skip(iprot, field.type);
           }
           break;
+        case 9:
+          if (field.type == TType.BOOL) {
+            this.blockCacheEnabled = iprot.readBool();
+            this.__isset.blockCacheEnabled = true;
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
         default:
           TProtocolUtil.skip(iprot, field.type);
           break;
@@ -247,6 +262,12 @@ public class ColumnDescriptor implements TBase, java.io.Serializable {
     oprot.writeFieldBegin(field);
     oprot.writeI32(this.bloomFilterNbHashes);
     oprot.writeFieldEnd();
+    field.name = "blockCacheEnabled";
+    field.type = TType.BOOL;
+    field.id = 9;
+    oprot.writeFieldBegin(field);
+    oprot.writeBool(this.blockCacheEnabled);
+    oprot.writeFieldEnd();
     oprot.writeFieldStop();
     oprot.writeStructEnd();
   }
@@ -269,6 +290,8 @@ public class ColumnDescriptor implements TBase, java.io.Serializable {
     sb.append(this.bloomFilterVectorSize);
     sb.append(",bloomFilterNbHashes:");
     sb.append(this.bloomFilterNbHashes);
+    sb.append(",blockCacheEnabled:");
+    sb.append(this.blockCacheEnabled);
     sb.append(")");
     return sb.toString();
   }

+ 3 - 3
src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java

@@ -184,11 +184,11 @@ public abstract class HBaseTestCase extends TestCase {
       final int versions) {
     HTableDescriptor htd = new HTableDescriptor(name);
     htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME1), versions,
-      CompressionType.NONE, false, Integer.MAX_VALUE, null));
+      CompressionType.NONE, false, false, Integer.MAX_VALUE, null));
     htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME2), versions,
-      CompressionType.NONE, false, Integer.MAX_VALUE, null));
+      CompressionType.NONE, false, false, Integer.MAX_VALUE, null));
     htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME3), versions,
-      CompressionType.NONE, false, Integer.MAX_VALUE, null));
+      CompressionType.NONE, false, false, Integer.MAX_VALUE, null));
     return htd;
   }
   

+ 2 - 0
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBloomFilters.java

@@ -169,6 +169,7 @@ public class TestBloomFilters extends HBaseClusterTestCase {
             1,                                        // Max versions
             HColumnDescriptor.CompressionType.NONE,   // no compression
             HColumnDescriptor.DEFAULT_IN_MEMORY,      // not in memory
+            HColumnDescriptor.DEFAULT_BLOCK_CACHE_ENABLED,
             HColumnDescriptor.DEFAULT_MAX_VALUE_LENGTH,
             bloomFilter
         )
@@ -234,6 +235,7 @@ public class TestBloomFilters extends HBaseClusterTestCase {
             1,                                        // Max versions
             HColumnDescriptor.CompressionType.NONE,   // no compression
             HColumnDescriptor.DEFAULT_IN_MEMORY,      // not in memory
+            HColumnDescriptor.DEFAULT_BLOCK_CACHE_ENABLED,
             HColumnDescriptor.DEFAULT_MAX_VALUE_LENGTH,
             bloomFilter
         )

+ 1 - 1
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java

@@ -337,7 +337,7 @@ public class TestTimestamp extends HBaseTestCase {
   private HRegion createRegion() throws IOException {
     HTableDescriptor htd = createTableDescriptor(getName());
     htd.addFamily(new HColumnDescriptor(COLUMN, VERSIONS,
-      CompressionType.NONE, false, Integer.MAX_VALUE, null));
+      CompressionType.NONE, false, false, Integer.MAX_VALUE, null));
     return createNewHRegion(htd, null, null);
   }
 }

+ 5 - 4
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestToString.java

@@ -44,8 +44,9 @@ public class TestToString extends TestCase {
     HTableDescriptor htd = HTableDescriptor.rootTableDesc;
     System. out.println(htd.toString());
     assertEquals("Table descriptor", "name: -ROOT-, families: {info:={name: " +
-        "info, max versions: 1, compression: NONE, in memory: false, max " +
-        "length: 2147483647, bloom filter: none}}", htd.toString());
+        "info, max versions: 1, compression: NONE, in memory: false, " +
+        "block cache enabled: false, max length: 2147483647, " +
+        "bloom filter: none}}", htd.toString());
   }
   
   /**
@@ -57,7 +58,7 @@ public class TestToString extends TestCase {
     assertEquals("HRegionInfo", 
       "regionname: -ROOT-,,0, startKey: <>, endKey: <>, encodedName: 70236052, tableDesc: " +
       "{name: -ROOT-, families: {info:={name: info, max versions: 1, " +
-      "compression: NONE, in memory: false, max length: 2147483647, bloom " +
-      "filter: none}}}", hri.toString());
+      "compression: NONE, in memory: false, block cache enabled: false, " +
+      "max length: 2147483647, bloom filter: none}}}", hri.toString());
   }
 }

+ 121 - 0
src/contrib/hbase/src/test/org/apache/hadoop/hbase/io/TestBlockFSInputStream.java

@@ -0,0 +1,121 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.io.DataInputBuffer;
+
+public class TestBlockFSInputStream extends TestCase {
+  
+  static class InMemoryFSInputStream extends FSInputStream {
+
+    private byte[] data;
+    private DataInputBuffer din = new DataInputBuffer();
+    
+    public InMemoryFSInputStream(byte[] data) {
+      this.data = data;
+      din.reset(data, data.length);
+    }
+    
+    @Override
+    public long getPos() throws IOException {
+      return din.getPosition();
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      if (pos > data.length) {
+        throw new IOException("Cannot seek after EOF");
+      }
+      din.reset(data, (int) pos, data.length - (int) pos);
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      return false;
+    }
+
+    @Override
+    public int read() throws IOException {
+      return din.read();
+    }
+    
+  }
+  
+  private byte[] data;
+  private BlockFSInputStream stream;
+  
+  @Override
+  protected void setUp() throws Exception {
+    data = new byte[34];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = (byte) i;
+    }
+    FSInputStream byteStream = new InMemoryFSInputStream(data);
+    stream = new BlockFSInputStream(byteStream, 34, 10);
+  }
+
+  public void testReadForwards() throws IOException {
+    for (int i = 0; i < data.length; i++) {
+      assertEquals(i, stream.getPos());
+      assertEquals(i, stream.read());
+    }
+
+  }
+  
+  public void testReadBackwards() throws IOException {
+    for (int i = data.length - 1; i >= 0; i--) {
+      stream.seek(i);
+      assertEquals(i, stream.getPos());
+      assertEquals(i, stream.read());
+    }
+  }
+  
+  public void testReadInChunks() throws IOException {
+    
+    byte[] buf = new byte[data.length];
+    int chunkLength = 6;
+    
+    assertEquals(6, stream.read(buf, 0, chunkLength));
+    assertEquals(4, stream.read(buf, 6, chunkLength));
+    
+    assertEquals(6, stream.read(buf, 10, chunkLength));
+    assertEquals(4, stream.read(buf, 16, chunkLength));
+    
+    assertEquals(6, stream.read(buf, 20, chunkLength));
+    assertEquals(4, stream.read(buf, 26, chunkLength));
+
+    assertEquals(4, stream.read(buf, 30, chunkLength));
+    
+    assertEquals(0, stream.available());
+    
+    assertEquals(-1, stream.read());
+    
+    for (int i = 0; i < buf.length; i++) {
+      assertEquals(i, buf[i]);
+    }
+
+
+  }
+}

+ 29 - 1
src/java/org/apache/hadoop/io/MapFile.java

@@ -246,12 +246,31 @@ public class MapFile {
     /** Construct a map reader for the named map using the named comparator.*/
     public Reader(FileSystem fs, String dirName, WritableComparator comparator, Configuration conf)
       throws IOException {
+      this(fs, dirName, comparator, conf, true);
+    }
+    
+    /**
+     * Hook to allow subclasses to defer opening streams until further
+     * initialization is complete.
+     * @see #createDataFileReader(FileSystem, Path, Configuration)
+     */
+    protected Reader(FileSystem fs, String dirName,
+        WritableComparator comparator, Configuration conf, boolean open)
+      throws IOException {
+      
+      if (open) {
+        open(fs, dirName, comparator, conf);
+      }
+    }
+    
+    protected synchronized void open(FileSystem fs, String dirName,
+        WritableComparator comparator, Configuration conf) throws IOException {
       Path dir = new Path(dirName);
       Path dataFile = new Path(dir, DATA_FILE_NAME);
       Path indexFile = new Path(dir, INDEX_FILE_NAME);
 
       // open the data
-      this.data = new SequenceFile.Reader(fs, dataFile,  conf);
+      this.data = createDataFileReader(fs, dataFile, conf);
       this.firstPosition = data.getPosition();
 
       if (comparator == null)
@@ -263,6 +282,15 @@ public class MapFile {
       this.index = new SequenceFile.Reader(fs, indexFile, conf);
     }
 
+    /**
+     * Override this method to specialize the type of
+     * {@link SequenceFile.Reader} returned.
+     */
+    protected SequenceFile.Reader createDataFileReader(FileSystem fs,
+        Path dataFile, Configuration conf) throws IOException {
+      return new SequenceFile.Reader(fs, dataFile,  conf);
+    }
+
     private void readIndex() throws IOException {
       // read the index entirely into memory
       if (this.keys != null)

+ 10 - 1
src/java/org/apache/hadoop/io/SequenceFile.java

@@ -1391,12 +1391,21 @@ public class SequenceFile {
                    long length, Configuration conf, boolean tempReader) 
     throws IOException {
       this.file = file;
-      this.in = fs.open(file, bufferSize);
+      this.in = openFile(fs, file, bufferSize, length);
       this.conf = conf;
       seek(start);
       this.end = in.getPos() + length;
       init(tempReader);
     }
+
+    /**
+     * Override this method to specialize the type of
+     * {@link FSDataInputStream} returned.
+     */
+    protected FSDataInputStream openFile(FileSystem fs, Path file,
+        int bufferSize, long length) throws IOException {
+      return fs.open(file, bufferSize);
+    }
     
     private Decompressor getPooledOrNewDecompressor() {
       Decompressor decompressor = null;

部分文件因文件數量過多而無法顯示