Browse Source

HADOOP-1903 Possible data loss if Exception happens between snapshot and flush
to disk.


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@575982 13f79535-47bb-0310-9956-ffa450edef68

Michael Stack 17 years ago
parent
commit
e45d42ca54

+ 2 - 0
src/contrib/hbase/CHANGES.txt

@@ -48,6 +48,8 @@ Trunk (unreleased changes)
     HADOOP-1870 Once file system failure has been detected, don't check it again
     HADOOP-1870 Once file system failure has been detected, don't check it again
                 and get on with shutting down the hbase cluster.
                 and get on with shutting down the hbase cluster.
     HADOOP-1888 NullPointerException in HMemcacheScanner
     HADOOP-1888 NullPointerException in HMemcacheScanner
+    HADOOP-1903 Possible data loss if Exception happens between snapshot and flush
+                to disk.
 
 
   IMPROVEMENTS
   IMPROVEMENTS
     HADOOP-1737 Make HColumnDescriptor data publically members settable
     HADOOP-1737 Make HColumnDescriptor data publically members settable

+ 32 - 0
src/contrib/hbase/src/java/org/apache/hadoop/hbase/DroppedSnapshotException.java

@@ -0,0 +1,32 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+import java.io.IOException;
+
+
+/**
+ * Thrown during flush if the possibility snapshot content was not properly
+ * persisted into store files.  Response should include replay of hlog content.
+ */
+public class DroppedSnapshotException extends IOException {
+  public DroppedSnapshotException(String msg) {
+    super(msg);
+  }
+
+  public DroppedSnapshotException() {
+    super();
+  }
+}

+ 22 - 6
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java

@@ -399,6 +399,7 @@ public class HLog implements HConstants {
    * the flush will not appear in the correct logfile.
    * the flush will not appear in the correct logfile.
    * @return sequence ID to pass {@link #completeCacheFlush(Text, Text, long)}
    * @return sequence ID to pass {@link #completeCacheFlush(Text, Text, long)}
    * @see #completeCacheFlush(Text, Text, long)
    * @see #completeCacheFlush(Text, Text, long)
+   * @see #abortCacheFlush()
    */
    */
   synchronized long startCacheFlush() {
   synchronized long startCacheFlush() {
     while (this.insideCacheFlush) {
     while (this.insideCacheFlush) {
@@ -422,7 +423,7 @@ public class HLog implements HConstants {
   synchronized void completeCacheFlush(final Text regionName,
   synchronized void completeCacheFlush(final Text regionName,
     final Text tableName, final long logSeqId)
     final Text tableName, final long logSeqId)
   throws IOException {
   throws IOException {
-    if(closed) {
+    if(this.closed) {
       return;
       return;
     }
     }
     
     
@@ -430,17 +431,32 @@ public class HLog implements HConstants {
       throw new IOException("Impossible situation: inside " +
       throw new IOException("Impossible situation: inside " +
         "completeCacheFlush(), but 'insideCacheFlush' flag is false");
         "completeCacheFlush(), but 'insideCacheFlush' flag is false");
     }
     }
-    
-    writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
+    HLogKey key = new HLogKey(regionName, tableName, HLog.METAROW, logSeqId);
+    this.writer.append(key,
       new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
       new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
         System.currentTimeMillis()));
         System.currentTimeMillis()));
-    numEntries.getAndIncrement();
+    this.numEntries.getAndIncrement();
 
 
     // Remember the most-recent flush for each region.
     // Remember the most-recent flush for each region.
     // This is used to delete obsolete log files.
     // This is used to delete obsolete log files.
-    regionToLastFlush.put(regionName, logSeqId);
+    this.regionToLastFlush.put(regionName, Long.valueOf(logSeqId));
 
 
-    insideCacheFlush = false;
+    cleanup();
+  }
+  
+  /**
+   * Abort a cache flush.
+   * This method will clear waits on {@link #insideCacheFlush}.  Call if the
+   * flush fails.  Note that the only recovery for an aborted flush currently
+   * is a restart of the regionserver so the snapshot content dropped by the
+   * failure gets restored to the  memcache.
+   */
+  synchronized void abortCacheFlush() {
+    cleanup();
+  }
+  
+  private synchronized void cleanup() {
+    this.insideCacheFlush = false;
     notifyAll();
     notifyAll();
   }
   }
   
   

+ 1 - 1
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java

@@ -657,7 +657,7 @@ HMasterRegionInterface {
               if (checkFileSystem()) {
               if (checkFileSystem()) {
                 // If filesystem is OK, is the exception a ConnectionException?
                 // If filesystem is OK, is the exception a ConnectionException?
                 // If so, mark the server as down.  No point scanning either
                 // If so, mark the server as down.  No point scanning either
-                // if no server to put meta region on.
+                // if no server to put meta region on. TODO.
                 if (e instanceof ConnectException) {
                 if (e instanceof ConnectException) {
                   LOG.debug("Region hosting server is gone.");
                   LOG.debug("Region hosting server is gone.");
                 }
                 }

+ 1 - 0
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java

@@ -101,6 +101,7 @@ public class HMemcache {
       }
       }
       Snapshot retval =
       Snapshot retval =
         new Snapshot(memcache, Long.valueOf(log.startCacheFlush()));
         new Snapshot(memcache, Long.valueOf(log.startCacheFlush()));
+      // From here on, any failure is catastrophic requiring replay of hlog
       this.snapshot = memcache;
       this.snapshot = memcache;
       history.add(memcache);
       history.add(memcache);
       memcache = new TreeMap<HStoreKey, byte []>();
       memcache = new TreeMap<HStoreKey, byte []>();

+ 27 - 11
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java

@@ -721,6 +721,9 @@ public class HRegion implements HConstants {
   /**
   /**
    * Each HRegion is given a periodic chance to flush the cache, which it should
    * Each HRegion is given a periodic chance to flush the cache, which it should
    * only take if there have been a lot of uncommitted writes.
    * only take if there have been a lot of uncommitted writes.
+   * @throws IOException
+   * @throws DroppedSnapshotException Thrown when replay of hlog is required
+   * because a Snapshot was not properly persisted.
    */
    */
   void optionallyFlush() throws IOException {
   void optionallyFlush() throws IOException {
     if(this.memcache.getSize() > this.memcacheFlushSize) {
     if(this.memcache.getSize() > this.memcacheFlushSize) {
@@ -754,6 +757,9 @@ public class HRegion implements HConstants {
    * close() the HRegion shortly, so the HRegion should not take on any new and 
    * close() the HRegion shortly, so the HRegion should not take on any new and 
    * potentially long-lasting disk operations. This flush() should be the final
    * potentially long-lasting disk operations. This flush() should be the final
    * pre-close() disk operation.
    * pre-close() disk operation.
+   * @throws IOException
+   * @throws DroppedSnapshotException Thrown when replay of hlog is required
+   * because a Snapshot was not properly persisted.
    */
    */
   void flushcache(boolean disableFutureWrites)
   void flushcache(boolean disableFutureWrites)
   throws IOException {
   throws IOException {
@@ -815,6 +821,9 @@ public class HRegion implements HConstants {
    * routes.
    * routes.
    * 
    * 
    * <p> This method may block for some time.
    * <p> This method may block for some time.
+   * @throws IOException
+   * @throws DroppedSnapshotException Thrown when replay of hlog is required
+   * because a Snapshot was not properly persisted.
    */
    */
   void internalFlushcache() throws IOException {
   void internalFlushcache() throws IOException {
     long startTime = -1;
     long startTime = -1;
@@ -833,13 +842,19 @@ public class HRegion implements HConstants {
     //
     //
     // When execution returns from snapshotMemcacheForLog() with a non-NULL
     // When execution returns from snapshotMemcacheForLog() with a non-NULL
     // value, the HMemcache will have a snapshot object stored that must be
     // value, the HMemcache will have a snapshot object stored that must be
-    // explicitly cleaned up using a call to deleteSnapshot().
+    // explicitly cleaned up using a call to deleteSnapshot() or by calling
+    // abort.
     //
     //
     HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
     HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
     if(retval == null || retval.memcacheSnapshot == null) {
     if(retval == null || retval.memcacheSnapshot == null) {
       LOG.debug("Finished memcache flush; empty snapshot");
       LOG.debug("Finished memcache flush; empty snapshot");
       return;
       return;
     }
     }
+
+    // Any failure from here on out will be catastrophic requiring server
+    // restart so hlog content can be replayed and put back into the memcache.
+    // Otherwise, the snapshot content while backed up in the hlog, it will not
+    // be part of the current running servers state.
     try {
     try {
       long logCacheFlushId = retval.sequenceId;
       long logCacheFlushId = retval.sequenceId;
       if(LOG.isDebugEnabled()) {
       if(LOG.isDebugEnabled()) {
@@ -852,7 +867,7 @@ public class HRegion implements HConstants {
       // A.  Flush memcache to all the HStores.
       // A.  Flush memcache to all the HStores.
       // Keep running vector of all store files that includes both old and the
       // Keep running vector of all store files that includes both old and the
       // just-made new flush store file.
       // just-made new flush store file.
-      for(HStore hstore: stores.values()) {
+      for (HStore hstore: stores.values()) {
         hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
         hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
       }
       }
 
 
@@ -860,17 +875,18 @@ public class HRegion implements HConstants {
       //     This tells future readers that the HStores were emitted correctly,
       //     This tells future readers that the HStores were emitted correctly,
       //     and that all updates to the log for this regionName that have lower 
       //     and that all updates to the log for this regionName that have lower 
       //     log-sequence-ids can be safely ignored.
       //     log-sequence-ids can be safely ignored.
-
-      log.completeCacheFlush(this.regionInfo.regionName,
-          regionInfo.tableDesc.getName(), logCacheFlushId);
+      this.log.completeCacheFlush(this.regionInfo.regionName,
+        regionInfo.tableDesc.getName(), logCacheFlushId);
     } catch (IOException e) {
     } catch (IOException e) {
-      LOG.fatal("Interrupted while flushing. Edits lost. FIX! HADOOP-1903", e);
-      log.abort();
-      throw e;
+      // An exception here means that the snapshot was not persisted.
+      // The hlog needs to be replayed so its content is restored to memcache.
+      // Currently, only a server restart will do this.
+      this.log.abortCacheFlush();
+      throw new DroppedSnapshotException(e.getMessage());
     } finally {
     } finally {
       // C. Delete the now-irrelevant memcache snapshot; its contents have been 
       // C. Delete the now-irrelevant memcache snapshot; its contents have been 
-      //    dumped to disk-based HStores.
-      memcache.deleteSnapshot();
+      //    dumped to disk-based HStores or, if error, clear aborted snapshot.
+      this.memcache.deleteSnapshot();
     }
     }
     
     
     // D. Finally notify anyone waiting on memcache to clear:
     // D. Finally notify anyone waiting on memcache to clear:
@@ -1386,7 +1402,7 @@ public class HRegion implements HConstants {
   }
   }
    
    
   /* 
   /* 
-   * Add updates to the log and add values to the memcache.
+   * Add updates first to the hlog and then add values to memcache.
    * Warning: Assumption is caller has lock on passed in row.
    * Warning: Assumption is caller has lock on passed in row.
    * @param row Row to update.
    * @param row Row to update.
    * @param timestamp Timestamp to record the updates against
    * @param timestamp Timestamp to record the updates against

+ 62 - 51
src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java

@@ -292,6 +292,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
       for(HRegion cur: nonClosedRegionsToFlush) {
       for(HRegion cur: nonClosedRegionsToFlush) {
         try {
         try {
           cur.optionallyFlush();
           cur.optionallyFlush();
+        } catch (DroppedSnapshotException e) {
+          // Cache flush can fail in a few places.  If it fails in a critical
+          // section, we get a DroppedSnapshotException and a replay of hlog
+          // is required. Currently the only way to do this is a restart of
+          // the server.
+          LOG.fatal("Replay of hlog required. Forcing server restart", e);
+          if (!checkFileSystem()) {
+            break;
+          }
+          HRegionServer.this.stop();
         } catch (IOException iex) {
         } catch (IOException iex) {
           LOG.error("Cache flush failed",
           LOG.error("Cache flush failed",
             RemoteExceptionHandler.checkIOException(iex));
             RemoteExceptionHandler.checkIOException(iex));
@@ -442,11 +452,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
 
 
   /**
   /**
    * Sets a flag that will cause all the HRegionServer threads to shut down
    * Sets a flag that will cause all the HRegionServer threads to shut down
-   * in an orderly fashion.
-   * <p>FOR DEBUGGING ONLY
+   * in an orderly fashion.  Used by unit tests and called by {@link Flusher}
+   * if it judges server needs to be restarted.
    */
    */
   synchronized void stop() {
   synchronized void stop() {
-    stopRequested.set(true);
+    this.stopRequested.set(true);
     notifyAll();                        // Wakes run() if it is sleeping
     notifyAll();                        // Wakes run() if it is sleeping
   }
   }
   
   
@@ -457,7 +467,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
    * from under hbase or we OOME.
    * from under hbase or we OOME.
    */
    */
   synchronized void abort() {
   synchronized void abort() {
-    abortRequested = true;
+    this.abortRequested = true;
     stop();
     stop();
   }
   }
 
 
@@ -621,7 +631,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
       if (this.fsOk) {
       if (this.fsOk) {
         // Only try to clean up if the file system is available
         // Only try to clean up if the file system is available
         try {
         try {
-          log.close();
+          this.log.close();
           LOG.info("On abort, closed hlog");
           LOG.info("On abort, closed hlog");
         } catch (IOException e) {
         } catch (IOException e) {
           LOG.error("Unable to close log in abort",
           LOG.error("Unable to close log in abort",
@@ -661,7 +671,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     }
     }
 
 
     join(); 
     join(); 
-    LOG.info("main thread exiting");
+    LOG.info(Thread.currentThread().getName() + " exiting");
   }
   }
 
 
   /*
   /*
@@ -674,7 +684,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
    * run.  On its way out, this server will shut down Server.  Leases are sort
    * run.  On its way out, this server will shut down Server.  Leases are sort
    * of inbetween. It has an internal thread that while it inherits from
    * of inbetween. It has an internal thread that while it inherits from
    * Chore, it keeps its own internal stop mechanism so needs to be stopped
    * Chore, it keeps its own internal stop mechanism so needs to be stopped
-   * by this hosting server.
+   * by this hosting server.  Worker logs the exception and exits.
    */
    */
   private void startAllServices() {
   private void startAllServices() {
     String n = Thread.currentThread().getName();
     String n = Thread.currentThread().getName();
@@ -731,6 +741,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     }
     }
   }
   }
 
 
+
   /** Add to the outbound message buffer */
   /** Add to the outbound message buffer */
   private void reportOpen(HRegion region) {
   private void reportOpen(HRegion region) {
     synchronized(outboundMsgs) {
     synchronized(outboundMsgs) {
@@ -790,58 +801,58 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
     
     
     public void run() {
     public void run() {
       try {
       try {
-      for(ToDoEntry e = null; !stopRequested.get(); ) {
-        try {
-          e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException ex) {
-          // continue
-        }
-        if(e == null || stopRequested.get()) {
-          continue;
-        }
-        try {
-          LOG.info(e.msg.toString());
-          
-          switch(e.msg.getMsg()) {
+        for(ToDoEntry e = null; !stopRequested.get(); ) {
+          try {
+            e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+          } catch (InterruptedException ex) {
+            // continue
+          }
+          if(e == null || stopRequested.get()) {
+            continue;
+          }
+          try {
+            LOG.info(e.msg.toString());
+            switch(e.msg.getMsg()) {
 
 
-          case HMsg.MSG_REGION_OPEN:
-            // Open a region
-            openRegion(e.msg.getRegionInfo());
-            break;
+            case HMsg.MSG_REGION_OPEN:
+              // Open a region
+              openRegion(e.msg.getRegionInfo());
+              break;
 
 
-          case HMsg.MSG_REGION_CLOSE:
-            // Close a region
-            closeRegion(e.msg.getRegionInfo(), true);
-            break;
+            case HMsg.MSG_REGION_CLOSE:
+              // Close a region
+              closeRegion(e.msg.getRegionInfo(), true);
+              break;
 
 
-          case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT:
-            // Close a region, don't reply
-            closeRegion(e.msg.getRegionInfo(), false);
-            break;
+            case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT:
+              // Close a region, don't reply
+              closeRegion(e.msg.getRegionInfo(), false);
+              break;
 
 
-          default:
-            throw new AssertionError(
-                "Impossible state during msg processing.  Instruction: "
-                + e.msg.toString());
-          }
-        } catch (IOException ie) {
-          ie = RemoteExceptionHandler.checkIOException(ie);
-          if(e.tries < numRetries) {
-            LOG.warn(ie);
-            e.tries++;
-            try {
-              toDo.put(e);
-            } catch (InterruptedException ex) {
-              throw new RuntimeException("Putting into msgQueue was interrupted.", ex);
+            default:
+              throw new AssertionError(
+                  "Impossible state during msg processing.  Instruction: "
+                  + e.msg.toString());
             }
             }
-          } else {
-            LOG.error("unable to process message: " + e.msg.toString(), ie);
-            if (!checkFileSystem()) {
-              break;
+          } catch (IOException ie) {
+            ie = RemoteExceptionHandler.checkIOException(ie);
+            if(e.tries < numRetries) {
+              LOG.warn(ie);
+              e.tries++;
+              try {
+                toDo.put(e);
+              } catch (InterruptedException ex) {
+                throw new RuntimeException("Putting into msgQueue was " +
+                  "interrupted.", ex);
+              }
+            } else {
+              LOG.error("unable to process message: " + e.msg.toString(), ie);
+              if (!checkFileSystem()) {
+                break;
+              }
             }
             }
           }
           }
         }
         }
-      }
       } catch(Throwable t) {
       } catch(Throwable t) {
         LOG.fatal("Unhandled exception", t);
         LOG.fatal("Unhandled exception", t);
       } finally {
       } finally {