Browse Source

HDFS-2701. Cleanup FS* processIOError methods. Contributed by Eli Collins

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1221098 13f79535-47bb-0310-9956-ffa450edef68
Eli Collins 13 years ago
parent
commit
d755cb260f

+ 2 - 0
CHANGES.txt

@@ -117,6 +117,8 @@ Release 1.1.0 - unreleased
 
 
     HDFS-2654. Make BlockReaderLocal not extend RemoteBlockReader2. (eli)
     HDFS-2654. Make BlockReaderLocal not extend RemoteBlockReader2. (eli)
 
 
+    HDFS-2701. Cleanup FS* processIOError methods. (eli)
+
 Release 1.0.0 - unreleased
 Release 1.0.0 - unreleased
 
 
   NEW FEATURES
   NEW FEATURES

+ 87 - 83
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -304,11 +304,13 @@ public class FSEditLog {
   }
   }
   
   
   private int getNumStorageDirs() {
   private int getNumStorageDirs() {
- int numStorageDirs = 0;
- for (Iterator<StorageDirectory> it = 
-       fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext(); it.next())
-   numStorageDirs++;
-    return numStorageDirs;
+   int numStorageDirs = 0;
+   Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);
+   while (it.hasNext()) {
+     numStorageDirs++;
+     it.next();
+   }
+   return numStorageDirs;
   }
   }
   
   
   synchronized int getNumEditStreams() {
   synchronized int getNumEditStreams() {
@@ -327,18 +329,18 @@ public class FSEditLog {
    */
    */
   public synchronized void open() throws IOException {
   public synchronized void open() throws IOException {
     numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
     numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
-    if (editStreams == null)
+    if (editStreams == null) {
       editStreams = new ArrayList<EditLogOutputStream>();
       editStreams = new ArrayList<EditLogOutputStream>();
-    for (Iterator<StorageDirectory> it = 
-           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+    }
+    Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS); 
+    while (it.hasNext()) {
       StorageDirectory sd = it.next();
       StorageDirectory sd = it.next();
       File eFile = getEditFile(sd);
       File eFile = getEditFile(sd);
       try {
       try {
         EditLogOutputStream eStream = new EditLogFileOutputStream(eFile);
         EditLogOutputStream eStream = new EditLogFileOutputStream(eFile);
         editStreams.add(eStream);
         editStreams.add(eStream);
-      } catch (IOException e) {
+      } catch (IOException ioe) {
         FSNamesystem.LOG.warn("Unable to open edit log file " + eFile);
         FSNamesystem.LOG.warn("Unable to open edit log file " + eFile);
-        // Remove the directory from list of storage directories
         it.remove();
         it.remove();
       }
       }
     }
     }
@@ -372,8 +374,8 @@ public class FSEditLog {
         eStream.setReadyToFlush();
         eStream.setReadyToFlush();
         eStream.flush();
         eStream.flush();
         eStream.close();
         eStream.close();
-      } catch (IOException e) {
-        processIOError(idx);
+      } catch (IOException ioe) {
+        removeEditsAndStorageDir(idx);
         idx--;
         idx--;
       }
       }
     }
     }
@@ -381,75 +383,80 @@ public class FSEditLog {
   }
   }
 
 
   /**
   /**
-   * If there is an IO Error on any log operations, remove that
-   * directory from the list of directories.
-   * If no more directories remain, then exit.
+   * Exit the NN process if there will be no valid edit streams 
+   * remaining after removing one. This method is called before
+   * removing the stream which is why we fail even if there is
+   * still one stream.
    */
    */
-  synchronized void processIOError(int index) {
+  private void exitIfInvalidStreams() {
     if (editStreams == null || editStreams.size() <= 1) {
     if (editStreams == null || editStreams.size() <= 1) {
       FSNamesystem.LOG.fatal(
       FSNamesystem.LOG.fatal(
-      "Fatal Error : All storage directories are inaccessible."); 
+          "Fatal Error: No edit streams are accessible");
       Runtime.getRuntime().exit(-1);
       Runtime.getRuntime().exit(-1);
     }
     }
-    assert(index < getNumStorageDirs());
-    assert(getNumStorageDirs() == editStreams.size());
+  }
+
+  /**
+   * @return the storage directory for the given edit stream. 
+   */
+  private File getStorageDirForStream(int idx) {
+    File editsFile =
+      ((EditLogFileOutputStream)editStreams.get(idx)).getFile();
+    // Namedir is the parent of current which is the parent of edits
+    return editsFile.getParentFile().getParentFile();
+  }
+
+  /**
+   * Remove the given edits stream and its containing storage dir.
+   * Exit the NN process if we have insufficient streams.
+   */
+  synchronized void removeEditsAndStorageDir(int idx) {
+    exitIfInvalidStreams();
+
+    assert idx < getNumStorageDirs();
+    assert getNumStorageDirs() == editStreams.size();
     
     
-    File parentStorageDir = ((EditLogFileOutputStream)editStreams
-                                      .get(index)).getFile()
-                                      .getParentFile().getParentFile();
-    editStreams.remove(index);
-    //
-    // Invoke the ioerror routine of the fsimage
-    //
-    fsimage.processIOError(parentStorageDir);
+    File dir = getStorageDirForStream(idx);
+    editStreams.remove(idx);
+    fsimage.removeStorageDir(dir);
   }
   }
-  
+
   /**
   /**
-   * If there is an IO Error on any log operations on storage directory,
-   * remove any stream associated with that directory 
+   * Remove all edits streams for the given storage directory.
+   * Exit the NN process if we have insufficient streams.
    */
    */
-  synchronized void processIOError(StorageDirectory sd) {
-    // Try to remove stream only if one should exist
-    if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
+  synchronized void removeEditsForStorageDir(StorageDirectory sd) {
+    if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
       return;
       return;
-    if (editStreams == null || editStreams.size() <= 1) {
-      FSNamesystem.LOG.fatal(
-          "Fatal Error : All storage directories are inaccessible."); 
-      Runtime.getRuntime().exit(-1);
     }
     }
+
+    exitIfInvalidStreams();
+
     for (int idx = 0; idx < editStreams.size(); idx++) {
     for (int idx = 0; idx < editStreams.size(); idx++) {
-      File parentStorageDir = ((EditLogFileOutputStream)editStreams
-                                       .get(idx)).getFile()
-                                       .getParentFile().getParentFile();
-      if (parentStorageDir.getName().equals(sd.getRoot().getName()))
+      File parentDir = getStorageDirForStream(idx);
+      if (parentDir.getName().equals(sd.getRoot().getName())) {
         editStreams.remove(idx);
         editStreams.remove(idx);
- }
+      }
+    }
   }
   }
   
   
   /**
   /**
-   * The specified streams have IO errors. Remove them from logging
-   * new transactions.
+   * Remove each of the given edits streams and their corresponding
+   * storage directories.
    */
    */
-  private void processIOError(ArrayList<EditLogOutputStream> errorStreams) {
+  private void removeEditsStreamsAndStorageDirs(
+      ArrayList<EditLogOutputStream> errorStreams) {
     if (errorStreams == null) {
     if (errorStreams == null) {
-      return;                       // nothing to do
+      return;
     }
     }
-    for (int idx = 0; idx < errorStreams.size(); idx++) {
-      EditLogOutputStream eStream = errorStreams.get(idx);
-      int j = 0;
-      int numEditStreams = editStreams.size();
-      for (j = 0; j < numEditStreams; j++) {
-        if (editStreams.get(j) == eStream) {
-          break;
-        }
+    for (EditLogOutputStream errorStream : errorStreams) {
+      int idx = editStreams.indexOf(errorStream);
+      if (-1 == idx) {
+        FSNamesystem.LOG.error(
+            "Fatal Error: Unable to find edits stream with IO error");
+        Runtime.getRuntime().exit(-1);
       }
       }
-      if (j == numEditStreams) {
-          FSNamesystem.LOG.error("Unable to find sync log on which " +
-                                 " IO error occured. " +
-                                 "Fatal Error.");
-          Runtime.getRuntime().exit(-1);
-      }
-      processIOError(j);
+      removeEditsAndStorageDir(idx);
     }
     }
     fsimage.incrementCheckpointTime();
     fsimage.incrementCheckpointTime();
   }
   }
@@ -458,8 +465,8 @@ public class FSEditLog {
    * check if ANY edits.new log exists
    * check if ANY edits.new log exists
    */
    */
   boolean existsNew() throws IOException {
   boolean existsNew() throws IOException {
-    for (Iterator<StorageDirectory> it = 
-           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+    Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);
+    while (it.hasNext()) {
       if (getEditNewFile(it.next()).exists()) { 
       if (getEditNewFile(it.next()).exists()) { 
         return true;
         return true;
       }
       }
@@ -909,10 +916,8 @@ public class FSEditLog {
       EditLogOutputStream eStream = editStreams.get(idx);
       EditLogOutputStream eStream = editStreams.get(idx);
       try {
       try {
         eStream.write(op, writables);
         eStream.write(op, writables);
-      } catch (IOException ie) {
-        processIOError(idx);         
-        // processIOError will remove the idx's stream 
-        // from the editStreams collection, so we need to update idx
+      } catch (IOException ioe) {
+        removeEditsAndStorageDir(idx);
         idx--; 
         idx--; 
       }
       }
     }
     }
@@ -983,7 +988,7 @@ public class FSEditLog {
       EditLogOutputStream eStream = editStreams.get(idx);
       EditLogOutputStream eStream = editStreams.get(idx);
       try {
       try {
         eStream.flush();
         eStream.flush();
-      } catch (IOException ie) {
+      } catch (IOException ioe) {
         //
         //
         // remember the streams that encountered an error.
         // remember the streams that encountered an error.
         //
         //
@@ -991,14 +996,13 @@ public class FSEditLog {
           errorStreams = new ArrayList<EditLogOutputStream>(1);
           errorStreams = new ArrayList<EditLogOutputStream>(1);
         }
         }
         errorStreams.add(eStream);
         errorStreams.add(eStream);
-        FSNamesystem.LOG.error("Unable to sync edit log. " +
-                               "Fatal Error.");
+        FSNamesystem.LOG.error("Unable to sync "+eStream.getName());
       }
       }
     }
     }
     long elapsed = FSNamesystem.now() - start;
     long elapsed = FSNamesystem.now() - start;
 
 
     synchronized (this) {
     synchronized (this) {
-       processIOError(errorStreams);
+       removeEditsStreamsAndStorageDirs(errorStreams);
        synctxid = syncStart;
        synctxid = syncStart;
        isSyncRunning = false;
        isSyncRunning = false;
        this.notifyAll();
        this.notifyAll();
@@ -1217,34 +1221,34 @@ public class FSEditLog {
     // exists in all directories.
     // exists in all directories.
     //
     //
     if (existsNew()) {
     if (existsNew()) {
-      for (Iterator<StorageDirectory> it = 
-               fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+      Iterator<StorageDirectory> it =
+        fsimage.dirIterator(NameNodeDirType.EDITS);
+      while (it.hasNext()) {
         File editsNew = getEditNewFile(it.next());
         File editsNew = getEditNewFile(it.next());
-     if (!editsNew.exists()) { 
-          throw new IOException("Inconsistent existance of edits.new " +
-                                editsNew);
+        if (!editsNew.exists()) {
+          throw new IOException(
+              "Inconsistent existence of edits.new " + editsNew);
         }
         }
       }
       }
       return; // nothing to do, edits.new exists!
       return; // nothing to do, edits.new exists!
     }
     }
 
 
-    close();                     // close existing edit log
+    close(); // close existing edit log
 
 
     //
     //
     // Open edits.new
     // Open edits.new
     //
     //
-    for (Iterator<StorageDirectory> it = 
-           fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
+    Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);
+    while (it.hasNext()) {
       StorageDirectory sd = it.next();
       StorageDirectory sd = it.next();
       try {
       try {
         EditLogFileOutputStream eStream = 
         EditLogFileOutputStream eStream = 
              new EditLogFileOutputStream(getEditNewFile(sd));
              new EditLogFileOutputStream(getEditNewFile(sd));
         eStream.create();
         eStream.create();
         editStreams.add(eStream);
         editStreams.add(eStream);
-      } catch (IOException e) {
-        // remove stream and this storage directory from list
-        processIOError(sd);
-       it.remove();
+      } catch (IOException ioe) {
+        removeEditsForStorageDir(sd);
+        it.remove();
       }
       }
     }
     }
   }
   }

+ 33 - 48
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -171,9 +171,9 @@ public class FSImage extends Storage {
   void setStorageDirectories(Collection<File> fsNameDirs,
   void setStorageDirectories(Collection<File> fsNameDirs,
                         Collection<File> fsEditsDirs
                         Collection<File> fsEditsDirs
                              ) throws IOException {
                              ) throws IOException {
-    this.storageDirs = new ArrayList<StorageDirectory>();
-    this.removedStorageDirs = new ArrayList<StorageDirectory>();
-   // Add all name dirs with appropriate NameNodeDirType 
+    storageDirs = new ArrayList<StorageDirectory>();
+    removedStorageDirs = new ArrayList<StorageDirectory>();
+    // Add all name dirs with appropriate NameNodeDirType 
     for (File dirName : fsNameDirs) {
     for (File dirName : fsNameDirs) {
       boolean isAlsoEdits = false;
       boolean isAlsoEdits = false;
       for (File editsDirName : fsEditsDirs) {
       for (File editsDirName : fsEditsDirs) {
@@ -186,13 +186,12 @@ public class FSImage extends Storage {
       NameNodeDirType dirType = (isAlsoEdits) ?
       NameNodeDirType dirType = (isAlsoEdits) ?
                           NameNodeDirType.IMAGE_AND_EDITS :
                           NameNodeDirType.IMAGE_AND_EDITS :
                           NameNodeDirType.IMAGE;
                           NameNodeDirType.IMAGE;
-      this.addStorageDir(new StorageDirectory(dirName, dirType));
+      addStorageDir(new StorageDirectory(dirName, dirType));
     }
     }
     
     
     // Add edits dirs if they are different from name dirs
     // Add edits dirs if they are different from name dirs
     for (File dirName : fsEditsDirs) {
     for (File dirName : fsEditsDirs) {
-      this.addStorageDir(new StorageDirectory(dirName, 
-                    NameNodeDirType.EDITS));
+      addStorageDir(new StorageDirectory(dirName, NameNodeDirType.EDITS)); 
     }
     }
   }
   }
 
 
@@ -207,9 +206,9 @@ public class FSImage extends Storage {
   }
   }
   
   
   List<StorageDirectory> getRemovedStorageDirs() {
   List<StorageDirectory> getRemovedStorageDirs() {
-	  return this.removedStorageDirs;
+	  return removedStorageDirs;
   }
   }
-  
+
   File getEditFile(StorageDirectory sd) {
   File getEditFile(StorageDirectory sd) {
     return getImageFile(sd, NameNodeFile.EDITS);
     return getImageFile(sd, NameNodeFile.EDITS);
   }
   }
@@ -604,27 +603,21 @@ public class FSImage extends Storage {
   }
   }
 
 
   /**
   /**
-   * Record new checkpoint time in order to
+   * Record new checkpoint time in each storage dir in order to
    * distinguish healthy directories from the removed ones.
    * distinguish healthy directories from the removed ones.
    * If there is an error writing new checkpoint time, the corresponding
    * If there is an error writing new checkpoint time, the corresponding
    * storage directory is removed from the list.
    * storage directory is removed from the list.
    */
    */
   void incrementCheckpointTime() {
   void incrementCheckpointTime() {
     this.checkpointTime++;
     this.checkpointTime++;
-    
-    // Write new checkpoint time in all storage directories
-    for(Iterator<StorageDirectory> it =
-                          dirIterator(); it.hasNext();) {
+
+    Iterator<StorageDirectory> it = dirIterator();
+    while (it.hasNext()) {
       StorageDirectory sd = it.next();
       StorageDirectory sd = it.next();
       try {
       try {
         writeCheckpointTime(sd);
         writeCheckpointTime(sd);
-      } catch(IOException e) {
-        // Close any edits stream associated with this dir and remove directory
-        if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-          editLog.processIOError(sd);
-        }
-
-        //add storage to the removed list
+      } catch (IOException ioe) {
+        editLog.removeEditsForStorageDir(sd);
         removedStorageDirs.add(sd);
         removedStorageDirs.add(sd);
         it.remove();
         it.remove();
       }
       }
@@ -632,16 +625,14 @@ public class FSImage extends Storage {
   }
   }
   
   
   /**
   /**
-   * Remove storage directory given directory
+   * Remove the given storage directory.
    */
    */
-  
-  void processIOError(File dirName) {
-    for (Iterator<StorageDirectory> it = 
-      dirIterator(); it.hasNext();) {
+  void removeStorageDir(File dir) {
+    Iterator<StorageDirectory> it = dirIterator(); 
+    while (it.hasNext()) {
       StorageDirectory sd = it.next();
       StorageDirectory sd = it.next();
-      if (sd.getRoot().getPath().equals(dirName.getPath())) {
-        //add storage to the removed list
-        LOG.info(" removing " + dirName.getPath());
+      if (sd.getRoot().getPath().equals(dir.getPath())) {
+        LOG.info("Removing " + dir.getPath());
         removedStorageDirs.add(sd);
         removedStorageDirs.add(sd);
         it.remove();
         it.remove();
       }
       }
@@ -1080,7 +1071,7 @@ public class FSImage extends Storage {
         moveCurrent(sd);
         moveCurrent(sd);
       } catch(IOException ie) {
       } catch(IOException ie) {
         LOG.error("Unable to move current for " + sd.getRoot(), ie);
         LOG.error("Unable to move current for " + sd.getRoot(), ie);
-        processIOError(sd.getRoot());
+        removeStorageDir(sd.getRoot());
       }
       }
     }
     }
 
 
@@ -1092,7 +1083,7 @@ public class FSImage extends Storage {
         saveCurrent(sd);
         saveCurrent(sd);
       } catch(IOException ie) {
       } catch(IOException ie) {
         LOG.error("Unable to save image for " + sd.getRoot(), ie);
         LOG.error("Unable to save image for " + sd.getRoot(), ie);
-        processIOError(sd.getRoot());
+        removeStorageDir(sd.getRoot());
       }
       }
     }
     }
 
 
@@ -1114,7 +1105,7 @@ public class FSImage extends Storage {
         saveCurrent(sd);
         saveCurrent(sd);
       } catch(IOException ie) {
       } catch(IOException ie) {
         LOG.error("Unable to save edits for " + sd.getRoot(), ie);
         LOG.error("Unable to save edits for " + sd.getRoot(), ie);
-        processIOError(sd.getRoot());
+        removeStorageDir(sd.getRoot());
       }
       }
     }
     }
     // mv lastcheckpoint.tmp -> previous.checkpoint
     // mv lastcheckpoint.tmp -> previous.checkpoint
@@ -1124,7 +1115,7 @@ public class FSImage extends Storage {
         moveLastCheckpoint(sd);
         moveLastCheckpoint(sd);
       } catch(IOException ie) {
       } catch(IOException ie) {
         LOG.error("Unable to move last checkpoint for " + sd.getRoot(), ie);
         LOG.error("Unable to move last checkpoint for " + sd.getRoot(), ie);
-        processIOError(sd.getRoot());
+        removeStorageDir(sd.getRoot());
       }
       }
     }
     }
     if(!editLog.isOpen()) editLog.open();
     if(!editLog.isOpen()) editLog.open();
@@ -1433,8 +1424,8 @@ public class FSImage extends Storage {
     if (!editLog.existsNew()) {
     if (!editLog.existsNew()) {
       throw new IOException("New Edits file does not exist");
       throw new IOException("New Edits file does not exist");
     }
     }
-    for (Iterator<StorageDirectory> it = 
-                       dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+    Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.IMAGE);
+    while (it.hasNext()) {
       StorageDirectory sd = it.next();
       StorageDirectory sd = it.next();
       File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
       File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
       if (!ckpt.exists()) {
       if (!ckpt.exists()) {
@@ -1447,8 +1438,8 @@ public class FSImage extends Storage {
     //
     //
     // Renames new image
     // Renames new image
     //
     //
-    for (Iterator<StorageDirectory> it = 
-                       dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
+    it = dirIterator(NameNodeDirType.IMAGE);
+    while (it.hasNext()) {
       StorageDirectory sd = it.next();
       StorageDirectory sd = it.next();
       File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
       File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
       File curFile = getImageFile(sd, NameNodeFile.IMAGE);
       File curFile = getImageFile(sd, NameNodeFile.IMAGE);
@@ -1457,10 +1448,7 @@ public class FSImage extends Storage {
       if (!ckpt.renameTo(curFile)) {
       if (!ckpt.renameTo(curFile)) {
         curFile.delete();
         curFile.delete();
         if (!ckpt.renameTo(curFile)) {
         if (!ckpt.renameTo(curFile)) {
-          // Close edit stream, if this directory is also used for edits
-          if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
-            editLog.processIOError(sd);
-        // add storage to the removed list
+          editLog.removeEditsForStorageDir(sd);
           removedStorageDirs.add(sd);
           removedStorageDirs.add(sd);
           it.remove();
           it.remove();
         }
         }
@@ -1473,8 +1461,8 @@ public class FSImage extends Storage {
     //
     //
     this.layoutVersion = FSConstants.LAYOUT_VERSION;
     this.layoutVersion = FSConstants.LAYOUT_VERSION;
     this.checkpointTime = FSNamesystem.now();
     this.checkpointTime = FSNamesystem.now();
-    for (Iterator<StorageDirectory> it = 
-                           dirIterator(); it.hasNext();) {
+    it = dirIterator();
+    while (it.hasNext()) {
       StorageDirectory sd = it.next();
       StorageDirectory sd = it.next();
       // delete old edits if sd is the image only the directory
       // delete old edits if sd is the image only the directory
       if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
       if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
@@ -1488,12 +1476,9 @@ public class FSImage extends Storage {
       }
       }
       try {
       try {
         sd.write();
         sd.write();
-      } catch (IOException e) {
-        LOG.error("Cannot write file " + sd.getRoot(), e);
-        // Close edit stream, if this directory is also used for edits
-        if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
-          editLog.processIOError(sd);
-      //add storage to the removed list
+      } catch (IOException ioe) {
+        LOG.error("Cannot write file " + sd.getRoot(), ioe);
+        editLog.removeEditsForStorageDir(sd);
         removedStorageDirs.add(sd);
         removedStorageDirs.add(sd);
         it.remove();
         it.remove();
       }
       }

+ 1 - 1
src/hdfs/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java

@@ -455,7 +455,7 @@ public class SecondaryNameNode implements Runnable {
     namenode.rollFsImage();
     namenode.rollFsImage();
     checkpointImage.endCheckpoint();
     checkpointImage.endCheckpoint();
 
 
-    LOG.warn("Checkpoint done. New Image Size: " 
+    LOG.info("Checkpoint done. New Image Size: "
               + checkpointImage.getFsImageName().length());
               + checkpointImage.getFsImageName().length());
   }
   }