Sfoglia il codice sorgente

HADOOP-1003. Remove flushing of namenode edit log from primary namenode lock, increasing namenode throughput. Contributed by Dhruba.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@549210 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 anni fa
parent
commit
f5d13ee1ef

+ 4 - 0
CHANGES.txt

@@ -193,6 +193,10 @@ Trunk (unreleased changes)
      "webinterface.private.actions" to enable this.
      (Enis Soztutar via cutting)
 
+ 60. HADOOP-1003.  Remove flushing of namenode edit log from primary
+     namenode lock, increasing namenode throughput.
+     (Dhruba Borthakur via cutting)
+
 
 Release 0.13.0 - 2007-06-08
 

+ 46 - 3
src/java/org/apache/hadoop/dfs/FSEditLog.java

@@ -48,6 +48,9 @@ class FSEditLog {
 
   private ArrayList<EditLogOutputStream> editStreams = null;
   private FSImage fsimage = null;
+
+  private long lastModificationTime;
+  private long lastSyncTime;
   
   static class EditLogOutputStream extends DataOutputStream {
     private FileDescriptor fd;
@@ -70,6 +73,8 @@ class FSEditLog {
 
   FSEditLog(FSImage image) {
     fsimage = image;
+    lastModificationTime = 0;
+    lastSyncTime = 0;
   }
 
   private File getEditFile(int idx) {
@@ -345,9 +350,10 @@ class FSEditLog {
   }
 
   /**
-   * Write an operation to the edit log
+   * Write an operation to the edit log. Do not sync to persistent
+   * store yet.
    */
-  void logEdit(byte op, Writable w1, Writable w2) {
+  synchronized void logEdit(byte op, Writable w1, Writable w2) {
     assert this.getNumEditStreams() > 0 : "no editlog streams";
     for (int idx = 0; idx < editStreams.size(); idx++) {
       EditLogOutputStream eStream;
@@ -360,7 +366,6 @@ class FSEditLog {
           if (w2 != null) {
             w2.write(eStream);
           }
-          eStream.flushAndSync();
         } catch (IOException ie) {
           try {
             processIOError(idx);         
@@ -372,6 +377,44 @@ class FSEditLog {
         }
       }
     }
+    //
+    // record the time when new data was written to the edits log
+    //
+    lastModificationTime = System.currentTimeMillis();
+  }
+
+  //
+  // flush all data of the Edits log into persistent store
+  //
+  synchronized void logSync() {
+    assert this.getNumEditStreams() > 0 : "no editlog streams";
+
+    //
+    // If data was generated before the beginning of the last sync time
+    // then there is nothing to flush
+    //
+    if (lastModificationTime < lastSyncTime) {
+      return;
+    }
+    lastSyncTime = System.currentTimeMillis();
+
+    for (int idx = 0; idx < editStreams.size(); idx++) {
+      EditLogOutputStream eStream;
+      synchronized (eStream = editStreams.get(idx)) {
+        try {
+          eStream.flushAndSync();
+        } catch (IOException ie) {
+          try {
+            processIOError(idx);         
+          } catch (IOException e) {
+            FSNamesystem.LOG.error("Unable to sync edit log. " +
+                                   "Fatal Error.");
+            throw new RuntimeException("Unable to sync edit log. " +
+                                       "Fatal Error.");
+          }
+        }
+      }
+    }
   }
 
   /** 

+ 52 - 7
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -518,7 +518,14 @@ class FSNamesystem implements FSConstants {
    *         false if file does not exist or is a directory
    * @author shv
    */
-  public synchronized boolean setReplication(String src, 
+  public boolean setReplication(String src, short replication) 
+                                throws IOException {
+    boolean status = setReplicationInternal(src, replication);
+    getEditLog().logSync();
+    return status;
+  }
+
+  private synchronized boolean setReplicationInternal(String src, 
                                              short replication
                                              ) throws IOException {
     if (isInSafeMode())
@@ -867,8 +874,14 @@ class FSNamesystem implements FSConstants {
    * Before we return, we make sure that all the file's blocks have 
    * been reported by datanodes and are replicated correctly.
    */
-  public synchronized int completeFile(UTF8 src, 
-                                       UTF8 holder) throws IOException {
+  public int completeFile(UTF8 src, UTF8 holder) throws IOException {
+    int status = completeFileInternal(src, holder);
+    getEditLog().logSync();
+    return status;
+  }
+
+  private synchronized int completeFileInternal(UTF8 src, 
+                                                UTF8 holder) throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder);
     if (isInSafeMode())
       throw new SafeModeException("Cannot complete file " + src, safeMode);
@@ -1081,10 +1094,16 @@ class FSNamesystem implements FSConstants {
   // are made, edit namespace and return to client.
   ////////////////////////////////////////////////////////////////
 
+  public boolean renameTo(UTF8 src, UTF8 dst) throws IOException {
+    boolean status = renameToInternal(src, dst);
+    getEditLog().logSync();
+    return status;
+  }
+
   /**
    * Change the indicated filename.
    */
-  public synchronized boolean renameTo(UTF8 src, UTF8 dst) throws IOException {
+  private synchronized boolean renameToInternal(UTF8 src, UTF8 dst) throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst);
     if (isInSafeMode())
       throw new SafeModeException("Cannot rename " + src, safeMode);
@@ -1098,7 +1117,17 @@ class FSNamesystem implements FSConstants {
    * Remove the indicated filename from the namespace.  This may
    * invalidate some blocks that make up the file.
    */
-  public synchronized boolean delete(UTF8 src) throws IOException {
+  public boolean delete(UTF8 src) throws IOException {
+    boolean status = deleteInternal(src);
+    getEditLog().logSync();
+    return status;
+  }
+
+  /**
+   * Remove the indicated filename from the namespace.  This may
+   * invalidate some blocks that make up the file.
+   */
+  private synchronized boolean deleteInternal(UTF8 src) throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
     if (isInSafeMode())
       throw new SafeModeException("Cannot delete " + src, safeMode);
@@ -1163,11 +1192,19 @@ class FSNamesystem implements FSConstants {
     }
     return true;
   }
+  /**
+   * Create all the necessary directories
+   */
+  public boolean mkdirs(String src) throws IOException {
+    boolean status = mkdirsInternal(src);
+    getEditLog().logSync();
+    return status;
+  }
     
   /**
    * Create all the necessary directories
    */
-  public synchronized boolean mkdirs(String src) throws IOException {
+  private synchronized boolean mkdirsInternal(String src) throws IOException {
     boolean    success;
     NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     if (isInSafeMode())
@@ -1447,7 +1484,15 @@ class FSNamesystem implements FSConstants {
    * @see DataNode#register()
    * @author Konstantin Shvachko
    */
-  public synchronized void registerDatanode(DatanodeRegistration nodeReg,
+  public void registerDatanode(DatanodeRegistration nodeReg,
+                               String networkLocation
+                               ) throws IOException {
+    registerDatanodeInternal(nodeReg, networkLocation);
+    getEditLog().logSync();
+  }
+
+  private synchronized void registerDatanodeInternal(
+                                            DatanodeRegistration nodeReg,
                                             String networkLocation
                                             ) throws IOException {