Parcourir la source

HADOOP-2139 (phase 2) Make region server more event driven

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@597959 13f79535-47bb-0310-9956-ffa450edef68
Jim Kellerman il y a 17 ans
Parent
commit
1cc5e16489
25 fichiers modifiés avec 1086 ajouts et 728 suppressions
  1. 1 0
      src/contrib/hbase/CHANGES.txt
  2. 8 0
      src/contrib/hbase/conf/hbase-default.xml
  3. 36 0
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/CacheFlushListener.java
  4. 138 122
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
  5. 356 334
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
  6. 6 5
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java
  7. 55 58
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
  8. 18 8
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
  9. 352 133
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
  10. 29 0
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/LogRollListener.java
  11. 5 1
      src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Sleeper.java
  12. 10 1
      src/contrib/hbase/src/test/hbase-site.xml
  13. 3 3
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
  14. 1 1
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
  15. 4 2
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
  16. 3 2
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java
  17. 4 4
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java
  18. 5 1
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java
  19. 1 1
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java
  20. 2 2
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java
  21. 27 32
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java
  22. 5 5
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java
  23. 4 9
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java
  24. 2 2
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java
  25. 11 2
      src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java

+ 1 - 0
src/contrib/hbase/CHANGES.txt

@@ -42,6 +42,7 @@ Trunk (unreleased changes)
     HADOOP-2176 Htable.deleteAll documentation is ambiguous
     HADOOP-2139 (phase 1) Increase parallelism in region servers.
     HADOOP-2267 [Hbase Shell] Change the prompt's title from 'hbase' to 'hql'.
+    HADOOP-2139 (phase 2) Make region server more event driven
 
 Release 0.15.1
 Branch 0.15

+ 8 - 0
src/contrib/hbase/conf/hbase-default.xml

@@ -143,6 +143,14 @@
     hbase.server.thread.wakefrequency.
     </description>
   </property>
+  <property>
+    <name>hbase.regionserver.optionalcacheflushinterval</name>
+    <value>60000</value>
+    <description>
+    Amount of time to wait since the last time a region was flushed before
+    invoking an optional cache flush. Default 60,000.
+    </description>
+  </property>
   <property>
     <name>hbase.hregion.memcache.flush.size</name>
     <value>16777216</value>

+ 36 - 0
src/contrib/hbase/src/java/org/apache/hadoop/hbase/CacheFlushListener.java

@@ -0,0 +1,36 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+/**
+ * Implementors of this interface want to be notified when an HRegion
+ * determines that a cache flush is needed. A CacheFlushListener (or null)
+ * must be passed to the HRegion constructor.
+ */
+public interface CacheFlushListener {
+
+  /**
+   * Tell the listener the cache needs to be flushed.
+   * 
+   * @param region the HRegion requesting the cache flush
+   */
+  void flushRequested(HRegion region);
+}

+ 138 - 122
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java

@@ -22,10 +22,13 @@ package org.apache.hadoop.hbase;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -89,7 +92,9 @@ public class HLog implements HConstants {
   final FileSystem fs;
   final Path dir;
   final Configuration conf;
+  final LogRollListener listener;
   final long threadWakeFrequency;
+  private final int maxlogentries;
 
   /*
    * Current log file.
@@ -99,12 +104,13 @@ public class HLog implements HConstants {
   /*
    * Map of all log files but the current one. 
    */
-  final TreeMap<Long, Path> outputfiles = new TreeMap<Long, Path>();
+  final SortedMap<Long, Path> outputfiles = 
+    Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
 
   /*
    * Map of region to last sequence/edit id. 
    */
-  final Map<Text, Long> lastSeqWritten = new HashMap<Text, Long>();
+  final Map<Text, Long> lastSeqWritten = new ConcurrentHashMap<Text, Long>();
 
   volatile boolean closed = false;
 
@@ -119,6 +125,10 @@ public class HLog implements HConstants {
   // synchronized is insufficient because a cache flush spans two method calls.
   private final Lock cacheFlushLock = new ReentrantLock();
 
+  // We synchronize on updateLock to prevent updates and to prevent a log roll
+  // during an update
+  private final Integer updateLock = new Integer(0);
+
   /**
    * Split up a bunch of log files, that are no longer being written to, into
    * new files, one per region. Delete the old log files when finished.
@@ -207,12 +217,15 @@ public class HLog implements HConstants {
    * @param conf
    * @throws IOException
    */
-  HLog(final FileSystem fs, final Path dir, final Configuration conf)
-  throws IOException {
+  HLog(final FileSystem fs, final Path dir, final Configuration conf,
+      final LogRollListener listener) throws IOException {
     this.fs = fs;
     this.dir = dir;
     this.conf = conf;
+    this.listener = listener;
     this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
+    this.maxlogentries =
+      conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000);
     if (fs.exists(dir)) {
       throw new IOException("Target HLog directory already exists: " + dir);
     }
@@ -256,98 +269,82 @@ public class HLog implements HConstants {
    *
    * @throws IOException
    */
-  synchronized void rollWriter() throws IOException {
-    boolean locked = false;
-    while (!locked && !closed) {
-      if (this.cacheFlushLock.tryLock()) {
-        locked = true;
-        break;
-      }
-      try {
-        this.wait(threadWakeFrequency);
-      } catch (InterruptedException e) {
-        // continue
-      }
-    }
-    if (closed) {
-      if (locked) {
-        this.cacheFlushLock.unlock();
-      }
-      throw new IOException("Cannot roll log; log is closed");
-    }
-
-    // If we get here we have locked out both cache flushes and appends
+  void rollWriter() throws IOException {
+    this.cacheFlushLock.lock();
     try {
-      if (this.writer != null) {
-        // Close the current writer, get a new one.
-        this.writer.close();
-        Path p = computeFilename(filenum - 1);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Closing current log writer " + p.toString() +
+      if (closed) {
+        return;
+      }
+      synchronized (updateLock) {
+        if (this.writer != null) {
+          // Close the current writer, get a new one.
+          this.writer.close();
+          Path p = computeFilename(filenum - 1);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Closing current log writer " + p.toString() +
             " to get a new one");
-        }
-        if (filenum > 0) {
-          synchronized (this.sequenceLock) {
-            this.outputfiles.put(Long.valueOf(this.logSeqNum - 1), p);
           }
-        }
-      }
-      Path newPath = computeFilename(filenum++);
-      this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath,
-          HLogKey.class, HLogEdit.class);
-      LOG.info("new log writer created at " + newPath);
-
-      // Can we delete any of the old log files?
-      if (this.outputfiles.size() > 0) {
-        if (this.lastSeqWritten.size() <= 0) {
-          LOG.debug("Last sequence written is empty. Deleting all old hlogs");
-          // If so, then no new writes have come in since all regions were
-          // flushed (and removed from the lastSeqWritten map). Means can
-          // remove all but currently open log file.
-          for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
-            deleteLogFile(e.getValue(), e.getKey());
+          if (filenum > 0) {
+            synchronized (this.sequenceLock) {
+              this.outputfiles.put(Long.valueOf(this.logSeqNum - 1), p);
+            }
           }
-          this.outputfiles.clear();
-        } else {
-          // Get oldest edit/sequence id.  If logs are older than this id,
-          // then safe to remove.
-          TreeSet<Long> sequenceNumbers =
-            new TreeSet<Long>(this.lastSeqWritten.values());
-          long oldestOutstandingSeqNum = sequenceNumbers.first().longValue();
-          // Get the set of all log files whose final ID is older than the
-          // oldest pending region operation
-          sequenceNumbers.clear();
-          sequenceNumbers.addAll(this.outputfiles.headMap(
-              Long.valueOf(oldestOutstandingSeqNum)).keySet());
-          // Now remove old log files (if any)
-          if (LOG.isDebugEnabled()) {
-            // Find region associated with oldest key -- helps debugging.
-            Text oldestRegion = null;
-            for (Map.Entry<Text, Long> e: this.lastSeqWritten.entrySet()) {
-              if (e.getValue().longValue() == oldestOutstandingSeqNum) {
-                oldestRegion = e.getKey();
-                break;
+        }
+        Path newPath = computeFilename(filenum++);
+        this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath,
+            HLogKey.class, HLogEdit.class);
+        LOG.info("new log writer created at " + newPath);
+
+        // Can we delete any of the old log files?
+        if (this.outputfiles.size() > 0) {
+          if (this.lastSeqWritten.size() <= 0) {
+            LOG.debug("Last sequence written is empty. Deleting all old hlogs");
+            // If so, then no new writes have come in since all regions were
+            // flushed (and removed from the lastSeqWritten map). Means can
+            // remove all but currently open log file.
+            for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
+              deleteLogFile(e.getValue(), e.getKey());
+            }
+            this.outputfiles.clear();
+          } else {
+            // Get oldest edit/sequence id.  If logs are older than this id,
+            // then safe to remove.
+            Long oldestOutstandingSeqNum =
+              Collections.min(this.lastSeqWritten.values());
+            // Get the set of all log files whose final ID is older than or
+            // equal to the oldest pending region operation
+            TreeSet<Long> sequenceNumbers =
+              new TreeSet<Long>(this.outputfiles.headMap(
+                (oldestOutstandingSeqNum + Long.valueOf(1L))).keySet());
+            // Now remove old log files (if any)
+            if (LOG.isDebugEnabled()) {
+              // Find region associated with oldest key -- helps debugging.
+              Text oldestRegion = null;
+              for (Map.Entry<Text, Long> e: this.lastSeqWritten.entrySet()) {
+                if (e.getValue().longValue() == oldestOutstandingSeqNum) {
+                  oldestRegion = e.getKey();
+                  break;
+                }
               }
+              LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
+                  "using oldest outstanding seqnum of " +
+                  oldestOutstandingSeqNum + " from region " + oldestRegion);
             }
-            LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
-              "using oldest outstanding seqnum of " + oldestOutstandingSeqNum +
-              " from region " + oldestRegion);
-          }
-          if (sequenceNumbers.size() > 0) {
-            for (Long seq : sequenceNumbers) {
-              deleteLogFile(this.outputfiles.remove(seq), seq);
+            if (sequenceNumbers.size() > 0) {
+              for (Long seq : sequenceNumbers) {
+                deleteLogFile(this.outputfiles.remove(seq), seq);
+              }
             }
           }
         }
+        this.numEntries = 0;
       }
-      this.numEntries = 0;
     } finally {
       this.cacheFlushLock.unlock();
     }
   }
   
-  private void deleteLogFile(final Path p, final Long seqno)
-  throws IOException {
+  private void deleteLogFile(final Path p, final Long seqno) throws IOException {
     LOG.info("removing old log file " + p.toString() +
       " whose highest sequence/edit id is " + seqno);
     this.fs.delete(p);
@@ -367,7 +364,7 @@ public class HLog implements HConstants {
    *
    * @throws IOException
    */
-  synchronized void closeAndDelete() throws IOException {
+  void closeAndDelete() throws IOException {
     close();
     fs.delete(dir);
   }
@@ -377,12 +374,19 @@ public class HLog implements HConstants {
    *
    * @throws IOException
    */
-  synchronized void close() throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("closing log writer in " + this.dir.toString());
+  void close() throws IOException {
+    cacheFlushLock.lock();
+    try {
+      synchronized (updateLock) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("closing log writer in " + this.dir.toString());
+        }
+        this.writer.close();
+        this.closed = true;
+      }
+    } finally {
+      cacheFlushLock.unlock();
     }
-    this.writer.close();
-    this.closed = true;
   }
 
   /**
@@ -409,29 +413,36 @@ public class HLog implements HConstants {
    * @param timestamp
    * @throws IOException
    */
-  synchronized void append(Text regionName, Text tableName,
+  void append(Text regionName, Text tableName,
       TreeMap<HStoreKey, byte[]> edits) throws IOException {
     
     if (closed) {
       throw new IOException("Cannot append; log is closed");
     }
-    long seqNum[] = obtainSeqNum(edits.size());
-    // The 'lastSeqWritten' map holds the sequence number of the oldest
-    // write for each region. When the cache is flushed, the entry for the
-    // region being flushed is removed if the sequence number of the flush
-    // is greater than or equal to the value in lastSeqWritten.
-    if (!this.lastSeqWritten.containsKey(regionName)) {
-      this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0]));
+    synchronized (updateLock) {
+      long seqNum[] = obtainSeqNum(edits.size());
+      // The 'lastSeqWritten' map holds the sequence number of the oldest
+      // write for each region. When the cache is flushed, the entry for the
+      // region being flushed is removed if the sequence number of the flush
+      // is greater than or equal to the value in lastSeqWritten.
+      if (!this.lastSeqWritten.containsKey(regionName)) {
+        this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0]));
+      }
+      int counter = 0;
+      for (Map.Entry<HStoreKey, byte[]> es : edits.entrySet()) {
+        HStoreKey key = es.getKey();
+        HLogKey logKey =
+          new HLogKey(regionName, tableName, key.getRow(), seqNum[counter++]);
+        HLogEdit logEdit =
+          new HLogEdit(key.getColumn(), es.getValue(), key.getTimestamp());
+        this.writer.append(logKey, logEdit);
+        this.numEntries++;
+      }
     }
-    int counter = 0;
-    for (Map.Entry<HStoreKey, byte[]> es : edits.entrySet()) {
-      HStoreKey key = es.getKey();
-      HLogKey logKey =
-        new HLogKey(regionName, tableName, key.getRow(), seqNum[counter++]);
-      HLogEdit logEdit =
-        new HLogEdit(key.getColumn(), es.getValue(), key.getTimestamp());
-      this.writer.append(logKey, logEdit);
-      this.numEntries++;
+    if (this.numEntries > this.maxlogentries) {
+      if (listener != null) {
+        listener.logRollRequested();
+      }
     }
   }
 
@@ -451,6 +462,11 @@ public class HLog implements HConstants {
     return value;
   }
 
+  /** @return the number of log files in use */
+  int getNumLogFiles() {
+    return outputfiles.size();
+  }
+
   /**
    * Obtain a specified number of sequence numbers
    *
@@ -487,43 +503,43 @@ public class HLog implements HConstants {
   /**
    * Complete the cache flush
    *
-   * Protected by this and cacheFlushLock
+   * Protected by cacheFlushLock
    *
    * @param regionName
    * @param tableName
    * @param logSeqId
    * @throws IOException
    */
-  synchronized void completeCacheFlush(final Text regionName,
-    final Text tableName, final long logSeqId)
-  throws IOException {
+  void completeCacheFlush(final Text regionName, final Text tableName,
+      final long logSeqId) throws IOException {
+
     try {
       if (this.closed) {
         return;
       }
-      this.writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
-        new HLogEdit(HLog.METACOLUMN, HLogEdit.completeCacheFlush.get(),
-          System.currentTimeMillis()));
-      this.numEntries++;
-      Long seq = this.lastSeqWritten.get(regionName);
-      if (seq != null && logSeqId >= seq.longValue()) {
-        this.lastSeqWritten.remove(regionName);
+      synchronized (updateLock) {
+        this.writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
+            new HLogEdit(HLog.METACOLUMN, HLogEdit.completeCacheFlush.get(),
+                System.currentTimeMillis()));
+        this.numEntries++;
+        Long seq = this.lastSeqWritten.get(regionName);
+        if (seq != null && logSeqId >= seq.longValue()) {
+          this.lastSeqWritten.remove(regionName);
+        }
       }
     } finally {
       this.cacheFlushLock.unlock();
-      notifyAll();              // wake up the log roller if it is waiting
     }
   }
 
   /**
-   * Abort a cache flush. This method will clear waits on
-   * {@link #insideCacheFlush}. Call if the flush fails. Note that the only
-   * recovery for an aborted flush currently is a restart of the regionserver so
-   * the snapshot content dropped by the failure gets restored to the memcache.
+   * Abort a cache flush.
+   * Call if the flush fails. Note that the only recovery for an aborted flush
+   * currently is a restart of the regionserver so the snapshot content dropped
+   * by the failure gets restored to the memcache.
    */
-  synchronized void abortCacheFlush() {
+  void abortCacheFlush() {
     this.cacheFlushLock.unlock();
-    notifyAll();
   }
 
   private static void usage() {

Fichier diff supprimé car celui-ci est trop grand
+ 356 - 334
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java


+ 6 - 5
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java

@@ -108,7 +108,8 @@ class HMerge implements HConstants {
       this.dir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
       this.basedir = new Path(dir, "merge_" + System.currentTimeMillis());
       fs.mkdirs(basedir);
-      this.hlog = new HLog(fs, new Path(basedir, HREGION_LOGDIR_NAME), conf);
+      this.hlog =
+        new HLog(fs, new Path(basedir, HREGION_LOGDIR_NAME), conf, null);
     }
     
     void process() throws IOException {
@@ -150,11 +151,11 @@ class HMerge implements HConstants {
       for(int i = 0; i < regions.length - 1; i++) {
         if(currentRegion == null) {
           currentRegion =
-            new HRegion(dir, hlog, fs, conf, regions[i], null);
+            new HRegion(dir, hlog, fs, conf, regions[i], null, null);
           currentSize = currentRegion.largestHStore(midKey).getAggregate();
         }
         nextRegion =
-          new HRegion(dir, hlog, fs, conf, regions[i + 1], null);
+          new HRegion(dir, hlog, fs, conf, regions[i + 1], null, null);
 
         nextSize = nextRegion.largestHStore(midKey).getAggregate();
 
@@ -327,7 +328,7 @@ class HMerge implements HConstants {
       // Scan root region to find all the meta regions
       
       HRegion root =
-        new HRegion(dir, hlog,fs, conf, HRegionInfo.rootRegionInfo, null);
+        new HRegion(dir, hlog,fs, conf, HRegionInfo.rootRegionInfo, null, null);
 
       HInternalScannerInterface rootScanner =
         root.getScanner(META_COLS, new Text(), System.currentTimeMillis(), null);
@@ -363,7 +364,7 @@ class HMerge implements HConstants {
         HRegion newRegion) throws IOException {
       
       HRegion root =
-        new HRegion(dir, hlog, fs, conf, HRegionInfo.rootRegionInfo, null);
+        new HRegion(dir, hlog, fs, conf, HRegionInfo.rootRegionInfo, null, null);
 
       Text[] regionsToDelete = {
           oldRegion1,

+ 55 - 58
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java

@@ -90,7 +90,6 @@ public class HRegion implements HConstants {
   static final Random rand = new Random();
   static final Log LOG = LogFactory.getLog(HRegion.class);
   final AtomicBoolean closed = new AtomicBoolean(false);
-  private volatile long noFlushCount = 0;
 
   /**
    * Merge two HRegions.  They must be available on the current
@@ -159,7 +158,7 @@ public class HRegion implements HConstants {
     // Done
     // Construction moves the merge files into place under region.
     HRegion dstRegion = new HRegion(rootDir, log, fs, conf, newRegionInfo,
-        newRegionDir);
+        newRegionDir, null);
 
     // Get rid of merges directory
 
@@ -221,9 +220,10 @@ public class HRegion implements HConstants {
   volatile WriteState writestate = new WriteState();
 
   final int memcacheFlushSize;
+  private volatile long lastFlushTime;
+  final CacheFlushListener flushListener;
   final int blockingMemcacheSize;
   protected final long threadWakeFrequency;
-  protected final int optionalFlushCount;
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private final Integer updateLock = new Integer(0);
   private final long desiredMaxFileSize;
@@ -251,11 +251,13 @@ public class HRegion implements HConstants {
    * @param regionInfo - HRegionInfo that describes the region
    * @param initialFiles If there are initial files (implying that the HRegion
    * is new), then read them from the supplied path.
+   * @param listener an object that implements CacheFlushListener or null
    * 
    * @throws IOException
    */
   public HRegion(Path rootDir, HLog log, FileSystem fs, HBaseConfiguration conf, 
-      HRegionInfo regionInfo, Path initialFiles) throws IOException {
+      HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener)
+    throws IOException {
     
     this.rootDir = rootDir;
     this.log = log;
@@ -265,8 +267,6 @@ public class HRegion implements HConstants {
     this.encodedRegionName =
       HRegionInfo.encodeRegionName(this.regionInfo.getRegionName());
     this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
-    this.optionalFlushCount =
-      conf.getInt("hbase.hregion.memcache.optionalflushcount", 10);
 
     // Declare the regionName.  This is a unique string for the region, used to 
     // build a unique filename.
@@ -314,6 +314,7 @@ public class HRegion implements HConstants {
     // By default, we flush the cache when 16M.
     this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size",
         1024*1024*16);
+    this.flushListener = listener;
     this.blockingMemcacheSize = this.memcacheFlushSize *
       conf.getInt("hbase.hregion.memcache.block.multiplier", 2);
 
@@ -323,6 +324,7 @@ public class HRegion implements HConstants {
 
     // HRegion is ready to go!
     this.writestate.compacting = false;
+    this.lastFlushTime = System.currentTimeMillis();
     LOG.info("region " + this.regionInfo.getRegionName() + " available");
   }
   
@@ -485,6 +487,11 @@ public class HRegion implements HConstants {
     return this.fs;
   }
 
+  /** @return the last time the region was flushed */
+  public long getLastFlushTime() {
+    return this.lastFlushTime;
+  }
+  
   //////////////////////////////////////////////////////////////////////////////
   // HRegion maintenance.  
   //
@@ -598,8 +605,10 @@ public class HRegion implements HConstants {
     // Done!
     // Opening the region copies the splits files from the splits directory
     // under each region.
-    HRegion regionA = new HRegion(rootDir, log, fs, conf, regionAInfo, dirA);
-    HRegion regionB = new HRegion(rootDir, log, fs, conf, regionBInfo, dirB);
+    HRegion regionA =
+      new HRegion(rootDir, log, fs, conf, regionAInfo, dirA, null);
+    HRegion regionB =
+      new HRegion(rootDir, log, fs, conf, regionBInfo, dirB, null);
 
     // Cleanup
     boolean deleted = fs.delete(splits);    // Get rid of splits directory
@@ -751,54 +760,30 @@ public class HRegion implements HConstants {
   }
 
   /**
-   * Flush the cache if necessary. This is called periodically to minimize the
-   * amount of log processing needed upon startup.
+   * Flush the cache.
    * 
-   * <p>The returned Vector is a list of all the files used by the component
-   * HStores. It is a list of HStoreFile objects.  If the returned value is
-   * NULL, then the flush could not be executed, because the HRegion is busy
-   * doing something else storage-intensive.  The caller should check back
-   * later.
+   * When this method is called the cache will be flushed unless:
+   * <ol>
+   *   <li>the cache is empty</li>
+   *   <li>the region is closed.</li>
+   *   <li>a flush is already in progress</li>
+   *   <li>writes are disabled</li>
+   * </ol>
    *
    * <p>This method may block for some time, so it should not be called from a 
    * time-sensitive thread.
    * 
-   * @param disableFutureWrites indicates that the caller intends to 
-   * close() the HRegion shortly, so the HRegion should not take on any new and 
-   * potentially long-lasting disk operations. This flush() should be the final
-   * pre-close() disk operation.
+   * @return true if cache was flushed
+   * 
    * @throws IOException
    * @throws DroppedSnapshotException Thrown when replay of hlog is required
    * because a Snapshot was not properly persisted.
    */
-  void flushcache() throws IOException {
+  boolean flushcache() throws IOException {
     lock.readLock().lock();                      // Prevent splits and closes
     try {
       if (this.closed.get()) {
-        return;
-      }
-      boolean needFlush = false;
-      long memcacheSize = this.memcacheSize.get();
-      if(memcacheSize > this.memcacheFlushSize) {
-        needFlush = true;
-      } else if (memcacheSize > 0) {
-        if (this.noFlushCount >= this.optionalFlushCount) {
-          LOG.info("Optional flush called " + this.noFlushCount +
-          " times when data present without flushing.  Forcing one.");
-          needFlush = true;
-        } else {
-          // Only increment if something in the cache.
-          // Gets zero'd when a flushcache is called.
-          this.noFlushCount++;
-        }
-      }
-      if (!needFlush) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Cache flush not needed for region " +
-              regionInfo.getRegionName() + ". Cache size=" + memcacheSize +
-              ", cache flush threshold=" + this.memcacheFlushSize);
-        }
-        return;
+        return false;
       }
       synchronized (writestate) {
         if ((!writestate.flushing) && writestate.writesEnabled) {
@@ -811,16 +796,15 @@ public class HRegion implements HConstants {
                 writestate.flushing + ", writesEnabled=" +
                 writestate.writesEnabled);
           }
-          return;  
+          return false;  
         }
       }
-      this.noFlushCount = 0;
       long startTime = -1;
       synchronized (updateLock) {// Stop updates while we snapshot the memcaches
         startTime = snapshotMemcaches();
       }
       try {
-        internalFlushcache(startTime);
+        return internalFlushcache(startTime);
       } finally {
         synchronized (writestate) {
           writestate.flushing = false;
@@ -835,7 +819,7 @@ public class HRegion implements HConstants {
   /*
    * It is assumed that updates are blocked for the duration of this method
    */
-  long snapshotMemcaches() {
+  private long snapshotMemcaches() {
     if (this.memcacheSize.get() == 0) {
       return -1;
     }
@@ -883,17 +867,24 @@ public class HRegion implements HConstants {
    * routes.
    * 
    * <p> This method may block for some time.
+   * 
+   * @param startTime the time the cache was snapshotted or -1 if a flush is
+   * not needed
+   * 
+   * @return true if the cache was flushed
+   * 
    * @throws IOException
    * @throws DroppedSnapshotException Thrown when replay of hlog is required
    * because a Snapshot was not properly persisted.
    */
-  void internalFlushcache(long startTime) throws IOException {
+  private boolean internalFlushcache(long startTime) throws IOException {
     if (startTime == -1) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Not flushing cache: snapshotMemcaches() determined that " +
-            "there was nothing to do");
+        LOG.debug("Not flushing cache for region " +
+            regionInfo.getRegionName() +
+            ": snapshotMemcaches() determined that there was nothing to do");
       }
-      return;
+      return false;
     }
 
     // We pass the log to the HMemcache, so we can lock down both
@@ -914,7 +905,6 @@ public class HRegion implements HConstants {
     // Otherwise, the snapshot content while backed up in the hlog, it will not
     // be part of the current running servers state.
 
-    long logCacheFlushId = sequenceId;
     try {
       // A.  Flush memcache to all the HStores.
       // Keep running vector of all store files that includes both old and the
@@ -938,7 +928,7 @@ public class HRegion implements HConstants {
     //     and that all updates to the log for this regionName that have lower 
     //     log-sequence-ids can be safely ignored.
     this.log.completeCacheFlush(this.regionInfo.getRegionName(),
-        regionInfo.getTableDesc().getName(), logCacheFlushId);
+        regionInfo.getTableDesc().getName(), sequenceId);
 
     // D. Finally notify anyone waiting on memcache to clear:
     // e.g. checkResources().
@@ -948,8 +938,10 @@ public class HRegion implements HConstants {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Finished memcache flush for region " +
           this.regionInfo.getRegionName() + " in " +
-          (System.currentTimeMillis() - startTime) + "ms");
+          (System.currentTimeMillis() - startTime) + "ms, sequenceid=" +
+          sequenceId);
     }
+    return true;
   }
   
   //////////////////////////////////////////////////////////////////////////////
@@ -1309,13 +1301,18 @@ public class HRegion implements HConstants {
       this.log.append(regionInfo.getRegionName(),
           regionInfo.getTableDesc().getName(), updatesByColumn);
 
+      long memcacheSize = 0;
       for (Map.Entry<HStoreKey, byte[]> e: updatesByColumn.entrySet()) {
         HStoreKey key = e.getKey();
         byte[] val = e.getValue();
-        this.memcacheSize.addAndGet(key.getSize() +
+        memcacheSize = this.memcacheSize.addAndGet(key.getSize() +
             (val == null ? 0 : val.length));
         stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val);
       }
+      if (this.flushListener != null && memcacheSize > this.memcacheFlushSize) {
+        // Request a cache flush
+        this.flushListener.flushRequested(this);
+      }
     }
   }
 
@@ -1582,8 +1579,8 @@ public class HRegion implements HConstants {
     FileSystem fs = FileSystem.get(conf);
     fs.mkdirs(regionDir);
     return new HRegion(rootDir,
-      new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf),
-      fs, conf, info, initialFiles);
+      new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null),
+      fs, conf, info, initialFiles, null);
   }
   
   /**

+ 18 - 8
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java

@@ -78,7 +78,18 @@ public class HRegionInfo implements WritableComparable {
   private boolean split;
   private Text startKey;
   private HTableDescriptor tableDesc;
-
+  private int hashCode;
+  
+  private void setHashCode() {
+    int result = this.regionName.hashCode();
+    result ^= Long.valueOf(this.regionId).hashCode();
+    result ^= this.startKey.hashCode();
+    result ^= this.endKey.hashCode();
+    result ^= Boolean.valueOf(this.offLine).hashCode();
+    result ^= this.tableDesc.hashCode();
+    this.hashCode = result;
+  }
+  
   /** Used to construct the HRegionInfo for the root and first meta regions */
   private HRegionInfo(long regionId, HTableDescriptor tableDesc) {
     this.regionId = regionId;
@@ -89,6 +100,7 @@ public class HRegionInfo implements WritableComparable {
         DELIMITER + regionId);
     this.split = false;
     this.startKey = new Text();
+    setHashCode();
   }
 
   /** Default constructor - creates empty object */
@@ -100,6 +112,7 @@ public class HRegionInfo implements WritableComparable {
     this.split = false;
     this.startKey = new Text();
     this.tableDesc = new HTableDescriptor();
+    this.hashCode = 0;
   }
   
   /**
@@ -152,6 +165,7 @@ public class HRegionInfo implements WritableComparable {
     }
     
     this.tableDesc = tableDesc;
+    setHashCode();
   }
   
   /** @return the endKey */
@@ -232,13 +246,7 @@ public class HRegionInfo implements WritableComparable {
    */
   @Override
   public int hashCode() {
-    int result = this.regionName.hashCode();
-    result ^= Long.valueOf(this.regionId).hashCode();
-    result ^= this.startKey.hashCode();
-    result ^= this.endKey.hashCode();
-    result ^= Boolean.valueOf(this.offLine).hashCode();
-    result ^= this.tableDesc.hashCode();
-    return result;
+    return this.hashCode;
   }
 
   //
@@ -256,6 +264,7 @@ public class HRegionInfo implements WritableComparable {
     out.writeBoolean(split);
     startKey.write(out);
     tableDesc.write(out);
+    out.writeInt(hashCode);
   }
   
   /**
@@ -269,6 +278,7 @@ public class HRegionInfo implements WritableComparable {
     this.split = in.readBoolean();
     this.startKey.readFields(in);
     this.tableDesc.readFields(in);
+    this.hashCode = in.readInt();
   }
   
   //

+ 352 - 133
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java

@@ -26,15 +26,19 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.ConcurrentModificationException;
 import java.util.HashMap;
-import java.util.List;
-import java.util.ListIterator;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.Vector;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.DelayQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -125,27 +129,79 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
   /** region server process name */
   public static final String REGIONSERVER = "regionserver";
 
+  /** Queue entry passed to flusher, compactor and splitter threads */
+  class QueueEntry implements Delayed {
+    private final HRegion region;
+    private long expirationTime;
+
+    QueueEntry(HRegion region, long expirationTime) {
+      this.region = region;
+      this.expirationTime = expirationTime;
+    }
+    
+    /** {@inheritDoc} */
+    @Override
+    public boolean equals(Object o) {
+      QueueEntry other = (QueueEntry) o;
+      return this.hashCode() == other.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int hashCode() {
+      return this.region.getRegionInfo().hashCode();
+    }
+
+    /** {@inheritDoc} */
+    public long getDelay(TimeUnit unit) {
+      return unit.convert(this.expirationTime - System.currentTimeMillis(),
+          TimeUnit.MILLISECONDS);
+    }
+
+    /** {@inheritDoc} */
+    public int compareTo(Delayed o) {
+      long delta = this.getDelay(TimeUnit.MILLISECONDS) -
+        o.getDelay(TimeUnit.MILLISECONDS);
+
+      int value = 0;
+      if (delta > 0) {
+        value = 1;
+        
+      } else if (delta < 0) {
+        value = -1;
+      }
+      return value;
+    }
+
+    /** @return the region */
+    public HRegion getRegion() {
+      return region;
+    }
+
+    /** @param expirationTime the expirationTime to set */
+    public void setExpirationTime(long expirationTime) {
+      this.expirationTime = expirationTime;
+    }
+  }
+  
   // Check to see if regions should be split
-  private final Thread splitOrCompactCheckerThread;
+  final Splitter splitter;
   // Needed at shutdown. On way out, if can get this lock then we are not in
   // middle of a split or compaction: i.e. splits/compactions cannot be
   // interrupted.
-  protected final Integer splitOrCompactLock = new Integer(0);
+  final Integer splitterLock = new Integer(0);
   
-  /*
-   * Runs periodically to determine if regions need to be compacted or split
-   */
-  class SplitOrCompactChecker extends Chore
-  implements RegionUnavailableListener {
+  /** Split regions on request */
+  class Splitter extends Thread implements RegionUnavailableListener {
+    private final BlockingQueue<QueueEntry> splitQueue =
+      new LinkedBlockingQueue<QueueEntry>();
+
     private HTable root = null;
     private HTable meta = null;
 
-    /**
-     * @param stop
-     */
-    public SplitOrCompactChecker(final AtomicBoolean stop) {
-      super(conf.getInt("hbase.regionserver.thread.splitcompactcheckfrequency",
-        30 * 1000), stop);
+    /** constructor */
+    public Splitter() {
+      super();
     }
 
     /** {@inheritDoc} */
@@ -178,35 +234,50 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     }
     
     /**
-     * Scan for splits or compactions to run.  Run any we find.
+     * Perform region splits if necessary
      */
     @Override
-    protected void chore() {
-      // Don't interrupt us while we're working
-      synchronized (splitOrCompactLock) {
-        checkForSplitsOrCompactions();
-      }
-    }
-    
-    private void checkForSplitsOrCompactions() {
-      // Grab a list of regions to check
-      List<HRegion> nonClosedRegionsToCheck = getRegionsToCheck();
-      for(HRegion cur: nonClosedRegionsToCheck) {
+    public void run() {
+      while (!stopRequested.get()) {
+        QueueEntry e = null;
         try {
-          if (cur.compactIfNeeded()) {
-            // After compaction, it probably needs splitting.  May also need
-            // splitting just because one of the memcache flushes was big.
-            split(cur);
-          }
+          e = splitQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+          
+        } catch (InterruptedException ex) {
+          continue;
+        }
+        if (e == null) {
+          continue;
+        }
+        synchronized (splitterLock) { // Don't interrupt us while we're working
+          try {
+            split(e.getRegion());
+            
+          } catch (IOException ex) {
+            LOG.error("Split failed for region " +
+                e.getRegion().getRegionName(),
+                RemoteExceptionHandler.checkIOException(ex));
+            if (!checkFileSystem()) {
+              break;
+            }
 
-        } catch(IOException e) {
-          //TODO: What happens if this fails? Are we toast?
-          LOG.error("Split or compaction failed", e);
-          if (!checkFileSystem()) {
-            break;
+          } catch (Exception ex) {
+            LOG.error("Split failed on region " +
+                e.getRegion().getRegionName(), ex);
+            if (!checkFileSystem()) {
+              break;
+            }
           }
         }
       }
+      LOG.info(getName() + " exiting");
+    }
+    
+    /**
+     * @param e entry indicating which region needs to be split
+     */
+    public void splitRequested(QueueEntry e) {
+      splitQueue.add(e);
     }
     
     private void split(final HRegion region) throws IOException {
@@ -271,100 +342,240 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     }
   }
 
-  // Cache flushing  
-  private final Thread cacheFlusherThread;
+  // Compactions
+  final Compactor compactor;
   // Needed during shutdown so we send an interrupt after completion of a
-  // flush, not in the midst.
-  protected final Integer cacheFlusherLock = new Integer(0);
-  
-  /* Runs periodically to flush memcache.
-   */
-  class Flusher extends Chore {
-    /**
-     * @param period
-     * @param stop
-     */
-    public Flusher(final int period, final AtomicBoolean stop) {
-      super(period, stop);
+  // compaction, not in the midst.
+  final Integer compactionLock = new Integer(0);
+
+  /** Compact region on request */
+  class Compactor extends Thread {
+    private final BlockingQueue<QueueEntry> compactionQueue =
+      new LinkedBlockingQueue<QueueEntry>();
+
+    /** constructor */
+    public Compactor() {
+      super();
     }
     
+    /** {@inheritDoc} */
     @Override
-    protected void chore() {
-      synchronized(cacheFlusherLock) {
-        checkForFlushesToRun();
-      }
-    }
-    
-    private void checkForFlushesToRun() {
-      // Grab a list of items to flush
-      List<HRegion> nonClosedRegionsToFlush = getRegionsToCheck();
-      // Flush them, if necessary
-      for(HRegion cur: nonClosedRegionsToFlush) {
+    public void run() {
+      while (!stopRequested.get()) {
+        QueueEntry e = null;
+        try {
+          e = compactionQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+          
+        } catch (InterruptedException ex) {
+          continue;
+        }
+        if (e == null) {
+          continue;
+        }
         try {
-          cur.flushcache();
-        } catch (DroppedSnapshotException e) {
-          // Cache flush can fail in a few places.  If it fails in a critical
-          // section, we get a DroppedSnapshotException and a replay of hlog
-          // is required. Currently the only way to do this is a restart of
-          // the server.
-          LOG.fatal("Replay of hlog required. Forcing server restart", e);
+          if (e.getRegion().compactIfNeeded()) {
+            splitter.splitRequested(e);
+          }
+          
+        } catch (IOException ex) {
+          LOG.error("Compaction failed for region " +
+              e.getRegion().getRegionName(),
+              RemoteExceptionHandler.checkIOException(ex));
           if (!checkFileSystem()) {
             break;
           }
-          HRegionServer.this.stop();
-        } catch (IOException iex) {
-          LOG.error("Cache flush failed",
-            RemoteExceptionHandler.checkIOException(iex));
+
+        } catch (Exception ex) {
+          LOG.error("Compaction failed for region " +
+              e.getRegion().getRegionName(), ex);
           if (!checkFileSystem()) {
             break;
           }
         }
       }
+      LOG.info(getName() + " exiting");
+    }
+    
+    /**
+     * @param e QueueEntry for region to be compacted
+     */
+    public void compactionRequested(QueueEntry e) {
+      compactionQueue.add(e);
+    }
+  }
+  
+  // Cache flushing  
+  final Flusher cacheFlusher;
+  // Needed during shutdown so we send an interrupt after completion of a
+  // flush, not in the midst.
+  final Integer cacheFlusherLock = new Integer(0);
+  
+  /** Flush cache upon request */
+  class Flusher extends Thread implements CacheFlushListener {
+    private final DelayQueue<QueueEntry> flushQueue =
+      new DelayQueue<QueueEntry>();
+
+    private final long optionalFlushPeriod;
+    
+    /** constructor */
+    public Flusher() {
+      super();
+      this.optionalFlushPeriod = conf.getLong(
+          "hbase.regionserver.optionalcacheflushinterval", 60L * 1000L);
+
+    }
+    
+    /** {@inheritDoc} */
+    @Override
+    public void run() {
+      while (!stopRequested.get()) {
+        QueueEntry e = null;
+        try {
+          e = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+          
+        } catch (InterruptedException ex) {
+          continue;
+
+        } catch (ConcurrentModificationException ex) {
+          continue;
+          
+        }
+        synchronized(cacheFlusherLock) { // Don't interrupt while we're working
+          if (e != null) {
+            try {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("flushing region " + e.getRegion().getRegionName());
+              }
+              if (e.getRegion().flushcache()) {
+                compactor.compactionRequested(e);
+              }
+              
+            } catch (DroppedSnapshotException ex) {
+              // Cache flush can fail in a few places.  If it fails in a critical
+              // section, we get a DroppedSnapshotException and a replay of hlog
+              // is required. Currently the only way to do this is a restart of
+              // the server.
+              LOG.fatal("Replay of hlog required. Forcing server restart", ex);
+              if (!checkFileSystem()) {
+                break;
+              }
+              HRegionServer.this.stop();
+              
+            } catch (IOException ex) {
+              LOG.error("Cache flush failed for region " +
+                  e.getRegion().getRegionName(),
+                  RemoteExceptionHandler.checkIOException(ex));
+              if (!checkFileSystem()) {
+                break;
+              }
+
+            } catch (Exception ex) {
+              LOG.error("Cache flush failed for region " +
+                  e.getRegion().getRegionName(), ex);
+              if (!checkFileSystem()) {
+                break;
+              }
+            }
+            e.setExpirationTime(System.currentTimeMillis() +
+                optionalFlushPeriod);
+            flushQueue.add(e);
+          }
+          
+          // Now insure that all the active regions are in the queue
+          
+          Set<HRegion> regions = getRegionsToCheck();
+          for (HRegion r: regions) {
+            e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod);
+            synchronized (flushQueue) {
+              if (!flushQueue.contains(e)) {
+                flushQueue.add(e);
+              }
+            }
+          }
+
+          // Now make sure that the queue only contains active regions
+
+          synchronized (flushQueue) {
+            for (Iterator<QueueEntry> i = flushQueue.iterator(); i.hasNext();  ) {
+              e = i.next();
+              if (!regions.contains(e.getRegion())) {
+                i.remove();
+              }
+            }
+          }
+        }
+      }
+      flushQueue.clear();
+      LOG.info(getName() + " exiting");
+    }
+    
+    /** {@inheritDoc} */
+    public void flushRequested(HRegion region) {
+      QueueEntry e = new QueueEntry(region, System.currentTimeMillis());
+      synchronized (flushQueue) {
+        if (flushQueue.contains(e)) {
+          flushQueue.remove(e);
+        }
+        flushQueue.add(e);
+      }
     }
   }
 
   // HLog and HLog roller.  log is protected rather than private to avoid
   // eclipse warning when accessed by inner classes
   protected HLog log;
-  private final Thread logRollerThread;
-  protected final Integer logRollerLock = new Integer(0);
+  final LogRoller logRoller;
+  final Integer logRollerLock = new Integer(0);
   
   /** Runs periodically to determine if the HLog should be rolled */
-  class LogRoller extends Chore {
-    private int MAXLOGENTRIES =
-      conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000);
+  class LogRoller extends Thread implements LogRollListener {
+    private volatile boolean rollLog;
     
-    /**
-     * @param period
-     * @param stop
-     */
-    public LogRoller(final int period, final AtomicBoolean stop) {
-      super(period, stop);
+    /** constructor */
+    public LogRoller() {
+      super();
+      this.rollLog = false;
     }
  
     /** {@inheritDoc} */
     @Override
-    protected void chore() {
-      synchronized(logRollerLock) {
-        checkForLogRoll();
-      }
-    }
-
-    private void checkForLogRoll() {
-      // If the number of log entries is high enough, roll the log.  This
-      // is a very fast operation, but should not be done too frequently.
-      int nEntries = log.getNumEntries();
-      if(nEntries > this.MAXLOGENTRIES) {
+    public synchronized void run() {
+      while (!stopRequested.get()) {
         try {
-          LOG.info("Rolling hlog. Number of entries: " + nEntries);
-          log.rollWriter();
-        } catch (IOException iex) {
-          LOG.error("Log rolling failed",
-            RemoteExceptionHandler.checkIOException(iex));
-          checkFileSystem();
+          this.wait(threadWakeFrequency);
+          
+        } catch (InterruptedException e) {
+          continue;
+        }
+        if (!rollLog) {
+          continue;
+        }
+        synchronized (logRollerLock) {
+          try {
+            LOG.info("Rolling hlog. Number of entries: " + log.getNumEntries());
+            log.rollWriter();
+            
+          } catch (IOException ex) {
+            LOG.error("Log rolling failed",
+              RemoteExceptionHandler.checkIOException(ex));
+            checkFileSystem();
+            
+          } catch (Exception ex) {
+            LOG.error("Log rolling failed", ex);
+            checkFileSystem();
+            
+          } finally {
+            rollLog = false;
+          }
         }
       }
     }
+
+    /** {@inheritDoc} */
+    public synchronized void logRollRequested() {
+      rollLog = true;
+      this.notifyAll();
+    }
   }
 
   /**
@@ -396,20 +607,22 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     this.serverLeaseTimeout =
       conf.getInt("hbase.master.lease.period", 30 * 1000);
 
-    // Cache flushing chore thread.
-    this.cacheFlusherThread =
-      new Flusher(this.threadWakeFrequency, stopRequested);
+    // Cache flushing thread.
+    this.cacheFlusher = new Flusher();
+    
+    // Compaction thread
+    this.compactor = new Compactor();
     
-    // Check regions to see if they need to be split or compacted chore thread
-    this.splitOrCompactCheckerThread =
-      new SplitOrCompactChecker(this.stopRequested);
+    // Region split thread
+    this.splitter = new Splitter();
     
+    // Log rolling thread
+    this.logRoller = new LogRoller();
+
     // Task thread to process requests from Master
     this.worker = new Worker();
     this.workerThread = new Thread(worker);
     this.sleeper = new Sleeper(this.msgInterval, this.stopRequested);
-    this.logRollerThread =
-      new LogRoller(this.threadWakeFrequency, stopRequested);
     // Server to handle client requests
     this.server = RPC.getServer(this, address.getBindAddress(), 
       address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
@@ -557,14 +770,17 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
 
     // Send interrupts to wake up threads if sleeping so they notice shutdown.
     // TODO: Should we check they are alive?  If OOME could have exited already
-    synchronized(logRollerLock) {
-      this.logRollerThread.interrupt();
-    }
     synchronized(cacheFlusherLock) {
-      this.cacheFlusherThread.interrupt();
+      this.cacheFlusher.interrupt();
+    }
+    synchronized (compactionLock) {
+      this.compactor.interrupt();
     }
-    synchronized(splitOrCompactLock) {
-      this.splitOrCompactCheckerThread.interrupt();
+    synchronized (splitterLock) {
+      this.splitter.interrupt();
+    }
+    synchronized (logRollerLock) {
+      this.logRoller.interrupt();
     }
 
     if (abortRequested) {
@@ -657,7 +873,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
         "running at " + this.serverInfo.getServerAddress().toString() +
         " because logdir " + logdir.toString() + " exists");
     }
-    return new HLog(fs, logdir, conf);
+    return new HLog(fs, logdir, conf, logRoller);
   }
   
   /*
@@ -680,16 +896,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
         LOG.fatal("Set stop flag in " + t.getName(), e);
       }
     };
-    Threads.setDaemonThreadRunning(this.cacheFlusherThread, n + ".cacheFlusher",
-      handler);
-    Threads.setDaemonThreadRunning(this.splitOrCompactCheckerThread,
-      n + ".splitOrCompactChecker", handler);
-    Threads.setDaemonThreadRunning(this.logRollerThread, n + ".logRoller",
+    Threads.setDaemonThreadRunning(this.logRoller, n + ".logRoller",
+        handler);
+    Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
       handler);
-    // Worker is not the same as the above threads in that it does not
-    // inherit from Chore.  Set an UncaughtExceptionHandler on it in case its
-    // the one to see an OOME, etc., first.  The handler will set the stop
-    // flag.
+    Threads.setDaemonThreadRunning(this.compactor, n + ".compactor",
+        handler);
+    Threads.setDaemonThreadRunning(this.splitter, n + ".splitter", handler);
     Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
     // Leases is not a Thread. Internally it runs a daemon thread.  If it gets
     // an unhandled exception, it will just exit.
@@ -752,9 +965,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
    */
   void join() {
     join(this.workerThread);
-    join(this.logRollerThread);
-    join(this.cacheFlusherThread);
-    join(this.splitOrCompactCheckerThread);
+    join(this.logRoller);
+    join(this.cacheFlusher);
+    join(this.compactor);
+    join(this.splitter);
   }
 
   private void join(final Thread t) {
@@ -925,7 +1139,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     HRegion region = onlineRegions.get(regionInfo.getRegionName());
     if(region == null) {
       region = new HRegion(new Path(this.conf.get(HConstants.HBASE_DIR)),
-        this.log, FileSystem.get(conf), conf, regionInfo, null);
+        this.log, FileSystem.get(conf), conf, regionInfo, null,
+        this.cacheFlusher);
       this.lock.writeLock().lock();
       try {
         this.log.setSequenceNumber(region.getMinSequenceId());
@@ -1226,6 +1441,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
   public AtomicInteger getRequestCount() {
     return this.requestCount;
   }
+
+  /** @return reference to CacheFlushListener */
+  public CacheFlushListener getCacheFlushListener() {
+    return this.cacheFlusher;
+  }
   
   /** 
    * Protected utility method for safely obtaining an HRegion handle.
@@ -1318,8 +1538,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
    * @return Returns list of non-closed regions hosted on this server.  If no
    * regions to check, returns an empty list.
    */
-  protected List<HRegion> getRegionsToCheck() {
-    ArrayList<HRegion> regionsToCheck = new ArrayList<HRegion>();
+  protected Set<HRegion> getRegionsToCheck() {
+    HashSet<HRegion> regionsToCheck = new HashSet<HRegion>();
     //TODO: is this locking necessary? 
     lock.readLock().lock();
     try {
@@ -1328,8 +1548,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
       lock.readLock().unlock();
     }
     // Purge closed regions.
-    for (final ListIterator<HRegion> i = regionsToCheck.listIterator();
-        i.hasNext();) {
+    for (final Iterator<HRegion> i = regionsToCheck.iterator(); i.hasNext();) {
       HRegion r = i.next();
       if (r.isClosed()) {
         i.remove();

+ 29 - 0
src/contrib/hbase/src/java/org/apache/hadoop/hbase/LogRollListener.java

@@ -0,0 +1,29 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+/**
+ * Mechanism by which the HLog requests a log roll
+ */
+public interface LogRollListener {
+  /** Request that the log be rolled */
+  public void logRollRequested();
+}

+ 5 - 1
src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Sleeper.java

@@ -31,6 +31,10 @@ public class Sleeper {
   private final int period;
   private AtomicBoolean stop;
   
+  /**
+   * @param sleep
+   * @param stop
+   */
   public Sleeper(final int sleep, final AtomicBoolean stop) {
     this.period = sleep;
     this.stop = stop;
@@ -40,7 +44,7 @@ public class Sleeper {
    * Sleep for period.
    */
   public void sleep() {
-    sleep(System.currentTimeMillis());
+    sleep(period);
   }
   
   /**

+ 10 - 1
src/contrib/hbase/src/test/hbase-site.xml

@@ -103,8 +103,17 @@
     the master will notice a dead region server sooner. The default is 15 seconds.
     </description>
   </property>
+  <property>
+    <name>hbase.regionserver.optionalcacheflushinterval</name>
+    <value>10000</value>
+    <description>
+    Amount of time to wait since the last time a region was flushed before
+    invoking an optional cache flush. Default 60,000.
+    </description>
+  </property>
   <property>
   	<name>hbase.rootdir</name>
   	<value>/hbase</value>
-  	<description>location of HBase instance in dfs</description></property>
+  	<description>location of HBase instance in dfs</description>
+  </property>
 </configuration>

+ 3 - 3
src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java

@@ -123,8 +123,8 @@ public abstract class HBaseTestCase extends TestCase {
     FileSystem fs = dir.getFileSystem(c);
     fs.mkdirs(regionDir);
     return new HRegion(dir,
-      new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf),
-      fs, conf, info, null);
+      new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf,
+          null), fs, conf, info, null, null);
   }
   
   protected HTableDescriptor createTableDescriptor(final String name) {
@@ -365,7 +365,7 @@ public abstract class HBaseTestCase extends TestCase {
       return region.getFull(row);
     }
     public void flushcache() throws IOException {
-      this.region.internalFlushcache(this.region.snapshotMemcaches());
+      this.region.flushcache();
     }
   }
 

+ 1 - 1
src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java

@@ -257,7 +257,7 @@ public class MiniHBaseCluster implements HConstants {
     for (LocalHBaseCluster.RegionServerThread t:
         this.hbaseCluster.getRegionServers()) {
       for(HRegion r: t.getRegionServer().onlineRegions.values() ) {
-        r.internalFlushcache(r.snapshotMemcaches());
+        r.flushcache();
       }
     }
   }

+ 4 - 2
src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java

@@ -103,9 +103,11 @@ public class MultiRegionTable extends HBaseTestCase {
       }
     }
 
-    // Flush will provoke a split next time the split-checker thread runs.
-    r.internalFlushcache(r.snapshotMemcaches());
+    // Flush the cache
     
+    cluster.getRegionThreads().get(0).getRegionServer().getCacheFlushListener().
+      flushRequested(r);
+
     // Now, wait until split makes it into the meta table.
     int oldCount = count;
     for (int i = 0; i < retries;  i++) {

+ 3 - 2
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java

@@ -54,10 +54,11 @@ public class TestCompaction extends HBaseTestCase {
   @Override
   public void setUp() throws Exception {
     super.setUp();
-    this.hlog = new HLog(this.localFs, this.testDir, this.conf);
+    this.hlog = new HLog(this.localFs, this.testDir, this.conf, null);
     HTableDescriptor htd = createTableDescriptor(getName());
     HRegionInfo hri = new HRegionInfo(htd, null, null);
-    this.r = new HRegion(testDir, hlog, this.localFs, this.conf, hri, null);
+    this.r =
+      new HRegion(testDir, hlog, this.localFs, this.conf, hri, null, null);
   }
   
   /** {@inheritDoc} */

+ 4 - 4
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java

@@ -93,9 +93,9 @@ public class TestGet extends HBaseTestCase {
           HRegionInfo.encodeRegionName(info.getRegionName()));
       fs.mkdirs(regionDir);
       
-      HLog log = new HLog(fs, new Path(regionDir, "log"), conf);
+      HLog log = new HLog(fs, new Path(regionDir, "log"), conf, null);
 
-      HRegion region = new HRegion(dir, log, fs, conf, info, null);
+      HRegion region = new HRegion(dir, log, fs, conf, info, null, null);
       HRegionIncommon r = new HRegionIncommon(region);
       
       // Write information to the table
@@ -135,7 +135,7 @@ public class TestGet extends HBaseTestCase {
       
       region.close();
       log.rollWriter();
-      region = new HRegion(dir, log, fs, conf, info, null);
+      region = new HRegion(dir, log, fs, conf, info, null, null);
       r = new HRegionIncommon(region);
       
       // Read it back
@@ -164,7 +164,7 @@ public class TestGet extends HBaseTestCase {
       
       region.close();
       log.rollWriter();
-      region = new HRegion(dir, log, fs, conf, info, null);
+      region = new HRegion(dir, log, fs, conf, info, null, null);
       r = new HRegionIncommon(region);
 
       // Read it back

+ 5 - 1
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java

@@ -45,6 +45,10 @@ public class TestHBaseCluster extends HBaseClusterTestCase {
     this.table = null;
     Logger.getRootLogger().setLevel(Level.INFO);
 
+    // Make the thread wake frequency a little slower so other threads
+    // can run
+    conf.setInt("hbase.server.thread.wakefrequency", 2000);
+    
     // Make lease timeout longer, lease checks less frequent
     conf.setInt("hbase.master.lease.period", 10 * 1000);
     conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
@@ -112,7 +116,7 @@ public class TestHBaseCluster extends HBaseClusterTestCase {
       Text rowlabel = new Text("row_" + k);
 
       byte bodydata[] = table.get(rowlabel, CONTENTS_BASIC);
-      assertNotNull(bodydata);
+      assertNotNull("no data for row " + rowlabel, bodydata);
       String bodystr = new String(bodydata, HConstants.UTF8_ENCODING).trim();
       String teststr = CONTENTSTR + k;
       assertEquals("Incorrect value for key: (" + rowlabel + "," + CONTENTS_BASIC

+ 1 - 1
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHLog.java

@@ -45,7 +45,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
     final Text tableName = new Text("tablename");
     final Text row = new Text("row");
     Reader reader = null;
-    HLog log = new HLog(fs, dir, this.conf);
+    HLog log = new HLog(fs, dir, this.conf, null);
     try {
       // Write columns named 1, 2, 3, etc. and then values of single byte
       // 1, 2, 3...

+ 2 - 2
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java

@@ -98,12 +98,12 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
     fs.mkdirs(parentdir);
     newlogdir = new Path(parentdir, "log");
 
-    log = new HLog(fs, newlogdir, conf);
+    log = new HLog(fs, newlogdir, conf, null);
     desc = new HTableDescriptor("test");
     desc.addFamily(new HColumnDescriptor("contents:"));
     desc.addFamily(new HColumnDescriptor("anchor:"));
     r = new HRegion(parentdir, log, fs, conf, 
-        new HRegionInfo(desc, null, null), null);
+        new HRegionInfo(desc, null, null), null, null);
     region = new HRegionIncommon(r);
   }
 

+ 27 - 32
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestLogRolling.java

@@ -19,10 +19,12 @@
  */
 package org.apache.hadoop.hbase;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.dfs.MiniDFSCluster;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 
 /**
@@ -32,7 +34,8 @@ public class TestLogRolling extends HBaseTestCase {
   private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
   private MiniDFSCluster dfs;
   private MiniHBaseCluster cluster;
-  private Path logdir;
+  private HRegionServer server;
+  private HLog log;
   private String tableName;
   private byte[] value;
   
@@ -45,10 +48,14 @@ public class TestLogRolling extends HBaseTestCase {
     try {
       this.dfs = null;
       this.cluster = null;
-      this.logdir = null;
+      this.server = null;
+      this.log = null;
       this.tableName = null;
       this.value = null;
 
+      // Force a region split after every 768KB
+      conf.setLong("hbase.hregion.max.filesize", 768L * 1024L);
+
       // We roll the log after every 256 writes
       conf.setInt("hbase.regionserver.maxlogentries", 256);
 
@@ -118,8 +125,8 @@ public class TestLogRolling extends HBaseTestCase {
       // continue
     }
 
-    this.logdir =
-      cluster.getRegionThreads().get(0).getRegionServer().getLog().dir;
+    this.server = cluster.getRegionThreads().get(0).getRegionServer();
+    this.log = server.getLog();
     
     // When the META table can be opened, the region servers are running
     @SuppressWarnings("unused")
@@ -150,21 +157,6 @@ public class TestLogRolling extends HBaseTestCase {
     }
   }
   
-  private int countLogFiles(final boolean print) throws Exception {
-    Path[] logfiles = dfs.getFileSystem().listPaths(new Path[] {this.logdir});
-    if (print) {
-      for (int i = 0; i < logfiles.length; i++) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("logfile: " + logfiles[i].toString());
-        }
-      }
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("number of log files: " + logfiles.length);
-    }
-    return logfiles.length;
-  }
-  
   /**
    * Tests that logs are deleted
    * 
@@ -172,21 +164,24 @@ public class TestLogRolling extends HBaseTestCase {
    */
   public void testLogRolling() throws Exception {
     tableName = getName();
-    // Force a region split after every 768KB
-    conf.setLong("hbase.hregion.max.filesize", 768L * 1024L);
     try {
       startAndWriteData();
-      int count = countLogFiles(true);
-      LOG.info("Finished writing. There are " + count + " log files. " +
-        "Sleeping to let cache flusher and log roller run");
-      while (count > 2) {
-        try {
-          Thread.sleep(1000L);
-        } catch (InterruptedException e) {
-          LOG.info("Sleep interrupted", e);
-        }
-        count = countLogFiles(true);
+      LOG.info("after writing there are " + log.getNumLogFiles() + " log files");
+      
+      // flush all regions
+      
+      List<HRegion> regions =
+        new ArrayList<HRegion>(server.getOnlineRegions().values());
+      for (HRegion r: regions) {
+        r.flushcache();
       }
+      
+      // Now roll the log
+      log.rollWriter();
+      
+      int count = log.getNumLogFiles();
+      LOG.info("after flushing all regions and rolling logs there are " +
+          log.getNumLogFiles() + " log files");
       assertTrue(count <= 2);
     } catch (Exception e) {
       LOG.fatal("unexpected exception", e);

+ 5 - 5
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java

@@ -144,9 +144,9 @@ public class TestScanner extends HBaseTestCase {
           HRegionInfo.encodeRegionName(REGION_INFO.getRegionName()));
       fs.mkdirs(regionDir);
       
-      HLog log = new HLog(fs, new Path(regionDir, "log"), conf);
+      HLog log = new HLog(fs, new Path(regionDir, "log"), conf, null);
 
-      r = new HRegion(dir, log, fs, conf, REGION_INFO, null);
+      r = new HRegion(dir, log, fs, conf, REGION_INFO, null, null);
       region = new HRegionIncommon(r);
       
       // Write information to the meta table
@@ -169,7 +169,7 @@ public class TestScanner extends HBaseTestCase {
       
       r.close();
       log.rollWriter();
-      r = new HRegion(dir, log, fs, conf, REGION_INFO, null);
+      r = new HRegion(dir, log, fs, conf, REGION_INFO, null, null);
       region = new HRegionIncommon(r);
 
       // Verify we can get the data back now that it is on disk.
@@ -210,7 +210,7 @@ public class TestScanner extends HBaseTestCase {
       
       r.close();
       log.rollWriter();
-      r = new HRegion(dir, log, fs, conf, REGION_INFO, null);
+      r = new HRegion(dir, log, fs, conf, REGION_INFO, null, null);
       region = new HRegionIncommon(r);
 
       // Validate again
@@ -247,7 +247,7 @@ public class TestScanner extends HBaseTestCase {
       
       r.close();
       log.rollWriter();
-      r = new HRegion(dir, log, fs, conf, REGION_INFO, null);
+      r = new HRegion(dir, log, fs, conf, REGION_INFO, null, null);
       region = new HRegionIncommon(r);
 
       // Validate again

+ 4 - 9
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java

@@ -65,11 +65,11 @@ public class TestSplit extends MultiRegionTable {
    */
   public void testBasicSplit() throws Exception {
     HRegion region = null;
-    HLog hlog = new HLog(this.localFs, this.testDir, this.conf);
+    HLog hlog = new HLog(this.localFs, this.testDir, this.conf, null);
     try {
       HTableDescriptor htd = createTableDescriptor(getName());
       HRegionInfo hri = new HRegionInfo(htd, null, null);
-      region = new HRegion(testDir, hlog, this.localFs, this.conf, hri, null);
+      region = new HRegion(testDir, hlog, this.localFs, this.conf, hri, null, null);
       basicSplit(region);
     } finally {
       if (region != null) {
@@ -81,7 +81,7 @@ public class TestSplit extends MultiRegionTable {
   
   private void basicSplit(final HRegion region) throws Exception {
     addContent(region, COLFAMILY_NAME3);
-    region.internalFlushcache(region.snapshotMemcaches());
+    region.flushcache();
     Text midkey = new Text();
     assertTrue(region.needsSplit(midkey));
     HRegion [] regions = split(region);
@@ -108,12 +108,7 @@ public class TestSplit extends MultiRegionTable {
       }
       addContent(regions[i], COLFAMILY_NAME2);
       addContent(regions[i], COLFAMILY_NAME1);
-      long startTime = region.snapshotMemcaches();
-      if (startTime == -1) {
-        LOG.info("cache flush not needed");
-      } else {
-        regions[i].internalFlushcache(startTime);
-      }
+      regions[i].flushcache();
     }
     
     // Assert that even if one store file is larger than a reference, the

+ 2 - 2
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java

@@ -310,11 +310,11 @@ public class TestTimestamp extends HBaseTestCase {
   }
   
   private HRegion createRegion() throws IOException {
-    HLog hlog = new HLog(this.localFs, this.testDir, this.conf);
+    HLog hlog = new HLog(this.localFs, this.testDir, this.conf, null);
     HTableDescriptor htd = createTableDescriptor(getName());
     htd.addFamily(new HColumnDescriptor(COLUMN, VERSIONS,
       CompressionType.NONE, false, Integer.MAX_VALUE, null));
     HRegionInfo hri = new HRegionInfo(htd, null, null);
-    return new HRegion(testDir, hlog, this.localFs, this.conf, hri, null);
+    return new HRegion(testDir, hlog, this.localFs, this.conf, hri, null, null);
   }
 }

+ 11 - 2
src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.mapred;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -87,6 +88,9 @@ public class TestTableMapReduce extends MultiRegionTable {
   public TestTableMapReduce() {
     super();
 
+    // The region server doesn't have to talk to the master quite so often
+    conf.setInt("hbase.regionserver.msginterval", 2000);
+    
     // Make the thread wake frequency a little slower so other threads
     // can run
     conf.setInt("hbase.server.thread.wakefrequency", 2000);
@@ -105,6 +109,9 @@ public class TestTableMapReduce extends MultiRegionTable {
     // Make lease timeout longer, lease checks less frequent
     conf.setInt("hbase.master.lease.period", 10 * 1000);
     conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
+    
+    // Set client pause to the original default
+    conf.setInt("hbase.client.pause", 10 * 1000);
   }
 
   /**
@@ -381,9 +388,11 @@ public class TestTableMapReduce extends MultiRegionTable {
         assertNotNull(firstValue);
         assertNotNull(secondValue);
         assertEquals(firstValue.length, secondValue.length);
-        for (int i=0; i<firstValue.length; i++) {
-          assertEquals(firstValue[i], secondValue[firstValue.length-i-1]);
+        byte[] secondReversed = new byte[secondValue.length];
+        for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
+          secondReversed[i] = secondValue[j];
         }
+        assertTrue(Arrays.equals(firstValue, secondReversed));
       }
       
     } finally {

Certains fichiers n'ont pas été affichés car il y a eu trop de fichiers modifiés dans ce diff