Sfoglia il codice sorgente

HADOOP-1813 OOME makes zombie of region server

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@575928 13f79535-47bb-0310-9956-ffa450edef68
Michael Stack 17 anni fa
parent
commit
c9faef7e0c

+ 1 - 0
src/contrib/hbase/CHANGES.txt

@@ -38,6 +38,7 @@ Trunk (unreleased changes)
                 (Ning Li via Stack)
     HADOOP-1800 output should default utf8 encoding
     HADOOP-1801 When hdfs is yanked out from under hbase, hbase should go down gracefully
+    HADOOP-1813 OOME makes zombie of region server
     HADOOP-1814	TestCleanRegionServerExit fails too often on Hudson
     HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8")
     HADOOP-1832 listTables() returns duplicate tables

+ 5 - 1
src/contrib/hbase/bin/hbase

@@ -206,7 +206,11 @@ else
   CLASS=$COMMAND
 fi
 
-
+# Have JVM dump heap if we run out of memory.  Files will be 'launch directory'
+# and are named like the following: java_pid21612.hprof. Apparently it doesn't
+# 'cost' to have this flag enabled. Its a 1.6 flag only. See:
+# http://blogs.sun.com/alanb/entry/outofmemoryerror_looks_a_bit_better 
+HBASE_OPTS="$HBASE_OPTS -XX:+HeapDumpOnOutOfMemoryError"
 HBASE_OPTS="$HBASE_OPTS -Dhadoop.log.dir=$HADOOP_LOG_DIR"
 HBASE_OPTS="$HBASE_OPTS -Dhadoop.log.file=$HADOOP_LOGFILE"
 HBASE_OPTS="$HBASE_OPTS -Dhadoop.home.dir=$HADOOP_HOME"

+ 13 - 4
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java

@@ -219,7 +219,6 @@ public class HLog implements HConstants {
         // cache-flush.  Otherwise, the log sequence number for
         // the CACHEFLUSH operation will appear in a "newer" log file
         // than it should.
-        
         while(insideCacheFlush) {
           try {
             wait();
@@ -402,14 +401,14 @@ public class HLog implements HConstants {
    * @see #completeCacheFlush(Text, Text, long)
    */
   synchronized long startCacheFlush() {
-    while (insideCacheFlush) {
+    while (this.insideCacheFlush) {
       try {
         wait();
       } catch (InterruptedException ie) {
         // continue
       }
     }
-    insideCacheFlush = true;
+    this.insideCacheFlush = true;
     notifyAll();
     return obtainSeqNum();
   }
@@ -427,7 +426,7 @@ public class HLog implements HConstants {
       return;
     }
     
-    if(! insideCacheFlush) {
+    if (!this.insideCacheFlush) {
       throw new IOException("Impossible situation: inside " +
         "completeCacheFlush(), but 'insideCacheFlush' flag is false");
     }
@@ -444,6 +443,16 @@ public class HLog implements HConstants {
     insideCacheFlush = false;
     notifyAll();
   }
+  
+  /**
+   * Abort a cache flush.
+   * This method will clear waits on {@link #insideCacheFlush} but if this
+   * method is called, we are losing data.  TODO: Fix.
+   */
+  synchronized void abort() {
+    this.insideCacheFlush = false;
+    notifyAll();
+  }
 
   private static void usage() {
     System.err.println("Usage: java org.apache.hbase.HLog" +

File diff suppressed because it is too large
+ 184 - 275
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java


+ 31 - 23
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java

@@ -834,37 +834,45 @@ public class HRegion implements HConstants {
     // When execution returns from snapshotMemcacheForLog() with a non-NULL
     // value, the HMemcache will have a snapshot object stored that must be
     // explicitly cleaned up using a call to deleteSnapshot().
+    //
     HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
     if(retval == null || retval.memcacheSnapshot == null) {
       LOG.debug("Finished memcache flush; empty snapshot");
       return;
     }
-    long logCacheFlushId = retval.sequenceId;
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Snapshotted memcache for region " +
-        this.regionInfo.regionName + " with sequence id " + retval.sequenceId +
-        " and entries " + retval.memcacheSnapshot.size());
-    }
+    try {
+      long logCacheFlushId = retval.sequenceId;
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Snapshotted memcache for region " +
+            this.regionInfo.regionName + " with sequence id " +
+            retval.sequenceId + " and entries " +
+            retval.memcacheSnapshot.size());
+      }
 
-    // A.  Flush memcache to all the HStores.
-    // Keep running vector of all store files that includes both old and the
-    // just-made new flush store file.
-    for(HStore hstore: stores.values()) {
-      hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
-    }
+      // A.  Flush memcache to all the HStores.
+      // Keep running vector of all store files that includes both old and the
+      // just-made new flush store file.
+      for(HStore hstore: stores.values()) {
+        hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
+      }
 
-    // B.  Write a FLUSHCACHE-COMPLETE message to the log.
-    //     This tells future readers that the HStores were emitted correctly,
-    //     and that all updates to the log for this regionName that have lower 
-    //     log-sequence-ids can be safely ignored.
+      // B.  Write a FLUSHCACHE-COMPLETE message to the log.
+      //     This tells future readers that the HStores were emitted correctly,
+      //     and that all updates to the log for this regionName that have lower 
+      //     log-sequence-ids can be safely ignored.
+
+      log.completeCacheFlush(this.regionInfo.regionName,
+          regionInfo.tableDesc.getName(), logCacheFlushId);
+    } catch (IOException e) {
+      LOG.fatal("Interrupted while flushing. Edits lost. FIX! HADOOP-1903", e);
+      log.abort();
+      throw e;
+    } finally {
+      // C. Delete the now-irrelevant memcache snapshot; its contents have been 
+      //    dumped to disk-based HStores.
+      memcache.deleteSnapshot();
+    }
     
-    log.completeCacheFlush(this.regionInfo.regionName,
-      regionInfo.tableDesc.getName(), logCacheFlushId);
-
-    // C. Delete the now-irrelevant memcache snapshot; its contents have been 
-    //    dumped to disk-based HStores.
-    memcache.deleteSnapshot();
-
     // D. Finally notify anyone waiting on memcache to clear:
     // e.g. checkResources().
     synchronized(this) {

File diff suppressed because it is too large
+ 317 - 406
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java


+ 38 - 36
src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java

@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import java.io.*;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Leases
@@ -41,14 +42,13 @@ import java.util.*;
 public class Leases {
   protected static final Log LOG = LogFactory.getLog(Leases.class.getName());
 
-  protected final long leasePeriod;
-  protected final long leaseCheckFrequency;
-  private final LeaseMonitor leaseMonitor;
+  protected final int leasePeriod;
+  protected final int leaseCheckFrequency;
   private final Thread leaseMonitorThread;
   protected final Map<LeaseName, Lease> leases =
     new HashMap<LeaseName, Lease>();
   protected final TreeSet<Lease> sortedLeases = new TreeSet<Lease>();
-  protected boolean running = true;
+  protected AtomicBoolean stop = new AtomicBoolean(false);
 
   /**
    * Creates a lease
@@ -57,18 +57,25 @@ public class Leases {
    * @param leaseCheckFrequency - how often the lease should be checked
    * (milliseconds)
    */
-  public Leases(long leasePeriod, long leaseCheckFrequency) {
+  public Leases(final int leasePeriod, final int leaseCheckFrequency) {
     this.leasePeriod = leasePeriod;
     this.leaseCheckFrequency = leaseCheckFrequency;
-    this.leaseMonitor = new LeaseMonitor();
-    this.leaseMonitorThread = new Thread(leaseMonitor);
-    this.leaseMonitorThread.setName("Lease.monitor");
+    this.leaseMonitorThread =
+      new LeaseMonitor(this.leaseCheckFrequency, this.stop);
+    this.leaseMonitorThread.setDaemon(true);
   }
   
   /** Starts the lease monitor */
   public void start() {
     leaseMonitorThread.start();
   }
+  
+  /**
+   * @param name Set name on the lease checking daemon thread.
+   */
+  public void setName(final String name) {
+    this.leaseMonitorThread.setName(name);
+  }
 
   /**
    * Shuts down this lease instance when all outstanding leases expire.
@@ -99,8 +106,7 @@ public class Leases {
    */
   public void close() {
     LOG.info("closing leases");
-
-    this.running = false;
+    this.stop.set(true);
     try {
       this.leaseMonitorThread.interrupt();
       this.leaseMonitorThread.join();
@@ -196,37 +202,33 @@ public class Leases {
         sortedLeases.remove(lease);
         leases.remove(name);
       }
-    }     
-//    if (LOG.isDebugEnabled()) {
-//      LOG.debug("Cancel lease " + name);
-//    }
+    }
   }
 
-  /** LeaseMonitor is a thread that expires Leases that go on too long. */
-  class LeaseMonitor implements Runnable {
-    /** {@inheritDoc} */
-    public void run() {
-      while(running) {
-        synchronized(leases) {
-          synchronized(sortedLeases) {
-            Lease top;
-            while((sortedLeases.size() > 0)
-                && ((top = sortedLeases.first()) != null)) {
-              if(top.shouldExpire()) {
-                leases.remove(top.getLeaseName());
-                sortedLeases.remove(top);
-                top.expired();
-              } else {
-                break;
-              }
+  /**
+   * LeaseMonitor is a thread that expires Leases that go on too long.
+   * Its a daemon thread.
+   */
+  class LeaseMonitor extends Chore {
+    public LeaseMonitor(int p, AtomicBoolean s) {
+      super(p, s);
+    }
+
+    protected void chore() {
+      synchronized(leases) {
+        synchronized(sortedLeases) {
+          Lease top;
+          while((sortedLeases.size() > 0)
+              && ((top = sortedLeases.first()) != null)) {
+            if(top.shouldExpire()) {
+              leases.remove(top.getLeaseName());
+              sortedLeases.remove(top);
+              top.expired();
+            } else {
+              break;
             }
           }
         }
-        try {
-          Thread.sleep(leaseCheckFrequency);
-        } catch (InterruptedException ie) {
-          // continue
-        }
       }
     }
   }

+ 28 - 1
src/contrib/hbase/src/java/org/apache/hadoop/hbase/RemoteExceptionHandler.java

@@ -30,7 +30,29 @@ import org.apache.hadoop.ipc.RemoteException;
  * org.apache.hadoop.ipc.RemoteException exceptions.
  */
 public class RemoteExceptionHandler {
-  private RemoteExceptionHandler(){}                    // not instantiable
+  /* Not instantiable */
+  private RemoteExceptionHandler() {super();}
+  
+  /**
+   * Examine passed IOException.  See if its carrying a RemoteException. If so,
+   * run {@link #decodeRemoteException(RemoteException)} on it.  Otherwise,
+   * pass back <code>e</code> unaltered.
+   * @param e Exception to examine.
+   * @return Decoded RemoteException carried by <code>e</code> or
+   * <code>e</code> unaltered.
+   */
+  public static IOException checkIOException(final IOException e) {
+    IOException result = e;
+    if (e instanceof RemoteException) {
+      try {
+        result = RemoteExceptionHandler.decodeRemoteException(
+            (RemoteException) e);
+      } catch (IOException ex) {
+        result = ex;
+      }
+    }
+    return result;
+  }
   
   /**
    * Converts org.apache.hadoop.ipc.RemoteException into original exception,
@@ -69,10 +91,15 @@ public class RemoteExceptionHandler {
       }
 
     } catch (ClassNotFoundException x) {
+      // continue
     } catch (NoSuchMethodException x) {
+      // continue
     } catch (IllegalAccessException x) {
+      // continue
     } catch (InvocationTargetException x) {
+      // continue
     } catch (InstantiationException x) {
+      // continue
     }
     return i;
   }

+ 22 - 16
src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java

@@ -77,7 +77,6 @@ public class MiniHBaseCluster implements HConstants {
    */
   public MiniHBaseCluster(Configuration conf, int nRegionNodes,
       final boolean miniHdfsFilesystem) throws IOException {
-    
     this(conf, nRegionNodes, miniHdfsFilesystem, true, true);
   }
 
@@ -127,7 +126,6 @@ public class MiniHBaseCluster implements HConstants {
       fs.mkdirs(parentdir);
       this.masterThread = startMaster(this.conf);
       this.regionThreads = startRegionServers(this.conf, nRegionNodes);
-
     } catch(IOException e) {
       shutdown();
       throw e;
@@ -233,18 +231,22 @@ public class MiniHBaseCluster implements HConstants {
    * Starts a region server thread running
    * 
    * @throws IOException
+   * @return Name of regionserver started.
    */
-  public void startRegionServer() throws IOException {
+  public String startRegionServer() throws IOException {
     RegionServerThread t =
       startRegionServer(this.conf, this.regionThreads.size());
     this.regionThreads.add(t);
+    return t.getName();
   }
   
   private static RegionServerThread startRegionServer(final Configuration c,
-    final int index) throws IOException {
-    
-    final HRegionServer hsr = new HRegionServer(c);
-    RegionServerThread t = new RegionServerThread(hsr, index);
+    final int index)
+  throws IOException {  
+    final HRegionServer hrs = new HRegionServer(c);
+    RegionServerThread t = new RegionServerThread(hrs, index);
+    t.setName("regionserver" +
+      t.getRegionServer().server.getListenerAddress().toString());
     t.start();
     return t;
   }
@@ -296,8 +298,9 @@ public class MiniHBaseCluster implements HConstants {
    * Wait for the specified region server to stop
    * Removes this thread from list of running threads.
    * @param serverNumber
+   * @return Name of region server that just went down.
    */
-  public void waitOnRegionServer(int serverNumber) {
+  public String waitOnRegionServer(int serverNumber) {
     RegionServerThread regionServerThread =
       this.regionThreads.remove(serverNumber);
     try {
@@ -307,6 +310,7 @@ public class MiniHBaseCluster implements HConstants {
     } catch (InterruptedException e) {
       e.printStackTrace();
     }
+    return regionServerThread.getName();
   }
   
   /**
@@ -353,14 +357,16 @@ public class MiniHBaseCluster implements HConstants {
     if(masterThread != null) {
       masterThread.getMaster().shutdown();
     }
-    synchronized(regionServerThreads) {
-      if (regionServerThreads != null) {
-        for(Thread t: regionServerThreads) {
-          if (t.isAlive()) {
-            try {
-              t.join();
-            } catch (InterruptedException e) {
-              // continue
+    if (regionServerThreads != null) {
+      synchronized(regionServerThreads) {
+        if (regionServerThreads != null) {
+          for(Thread t: regionServerThreads) {
+            if (t.isAlive()) {
+              try {
+                t.join();
+              } catch (InterruptedException e) {
+                // continue
+              }
             }
           }
         }

+ 19 - 0
src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java

@@ -1,3 +1,22 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hbase;
 
 import java.io.IOException;

+ 5 - 3
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java

@@ -22,6 +22,8 @@ package org.apache.hadoop.hbase;
 import java.io.IOException;
 import java.util.TreeMap;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -30,6 +32,7 @@ import org.apache.log4j.Logger;
  * Tests region server failover when a region server exits.
  */
 public class TestCleanRegionServerExit extends HBaseClusterTestCase {
+  private final Log LOG = LogFactory.getLog(this.getClass());
   private HTable table;
 
   /** constructor */
@@ -65,14 +68,13 @@ public class TestCleanRegionServerExit extends HBaseClusterTestCase {
     table.commit(lockid);
     // Start up a new region server to take over serving of root and meta
     // after we shut down the current meta/root host.
-    this.cluster.startRegionServer();
+    LOG.info("Started " + this.cluster.startRegionServer());
     // Now shutdown the region server and wait for it to go down.
     this.cluster.stopRegionServer(0);
-    this.cluster.waitOnRegionServer(0);
+    LOG.info(this.cluster.waitOnRegionServer(0) + " is down");
     
     // Verify that the client can find the data after the region has been moved
     // to a different server
-
     HScannerInterface scanner =
       table.obtainScanner(HConstants.COLUMN_FAMILY_ARRAY, new Text());
 

+ 7 - 6
src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java

@@ -19,6 +19,9 @@
  */
 package org.apache.hadoop.hbase;
 
+import junit.framework.TestSuite;
+import junit.textui.TestRunner;
+
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
@@ -40,10 +43,8 @@ public class TestDFSAbort extends HBaseClusterTestCase {
   @Override
   public void setUp() throws Exception {
     super.setUp();
-    
     HTableDescriptor desc = new HTableDescriptor(getName());
     desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY_STR));
-    
     HBaseAdmin admin = new HBaseAdmin(conf);
     admin.createTable(desc);
   }
@@ -52,14 +53,14 @@ public class TestDFSAbort extends HBaseClusterTestCase {
    * @throws Exception
    */
   public void testDFSAbort() throws Exception {
-    
     // By now the Mini DFS is running, Mini HBase is running and we have
     // created a table. Now let's yank the rug out from HBase
-    
     cluster.getDFSCluster().shutdown();
-    
     // Now wait for Mini HBase Cluster to shut down
-    
     cluster.join();
   }
+  
+  public static void main(String[] args) {
+    TestRunner.run(new TestSuite(TestDFSAbort.class));
+  }
 }

Some files were not shown because too many files changed in this diff