Forráskód Böngészése

HADOOP-163. Cause datanodes that are unable to either read or write data to exit. Contributed by Hairong.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@409773 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 éve
szülő
commit
b8434e7a24

+ 5 - 0
CHANGES.txt

@@ -73,6 +73,11 @@ Trunk (unreleased)
     nodes.  This, together with HADOOP-195, greatly improves the
     performance of these transfers.  (omalley via cutting)
 
+20. HADOOP-163.  Cause datanodes that are unable to either read or
+    write data to exit, so that the namenode will no longer target
+    them for new blocks and will replicate their data on other nodes.
+    (Hairong Kuang via cutting)
+
 
 Release 0.2.1 - 2006-05-12
 

+ 116 - 85
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -18,6 +18,7 @@ package org.apache.hadoop.dfs;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 import java.io.*;
 import java.net.*;
@@ -153,6 +154,16 @@ public class DataNode implements FSConstants, Runnable {
         }
     }
 
+    void handleDiskError( String errMsgr ) {
+        LOG.warning( "Shuting down DataNode because "+errMsgr );
+        try {
+            namenode.errorReport(
+                    localName, DatanodeProtocol.DISK_ERROR, errMsgr);
+        } catch( IOException ignored) {              
+        }
+        shutdown();
+    }
+    
     /**
      * Main loop for the DataNode.  Runs until shutdown,
      * forever calling remote NameNode functions.
@@ -164,96 +175,110 @@ public class DataNode implements FSConstants, Runnable {
         //
         // Now loop for a long time....
         //
-        while (shouldRun) {
-            long now = System.currentTimeMillis();
 
-            //
-            // Every so often, send heartbeat or block-report
-            //
-            if (now - lastHeartbeat > HEARTBEAT_INTERVAL) {
+        try {
+            while (shouldRun) {
+                long now = System.currentTimeMillis();
+    
                 //
-                // All heartbeat messages include following info:
-                // -- Datanode name
-                // -- data transfer port
-                // -- Total capacity
-                // -- Bytes remaining
+                // Every so often, send heartbeat or block-report
                 //
-                BlockCommand cmd = namenode.sendHeartbeat(localName, 
-                        data.getCapacity(), data.getRemaining(), xmitsInProgress);
-                //LOG.info("Just sent heartbeat, with name " + localName);
-                lastHeartbeat = now;
-
-                if (cmd != null && cmd.transferBlocks()) {
+                if (now - lastHeartbeat > HEARTBEAT_INTERVAL) {
                     //
-                    // Send a copy of a block to another datanode
+                    // All heartbeat messages include following info:
+                    // -- Datanode name
+                    // -- data transfer port
+                    // -- Total capacity
+                    // -- Bytes remaining
                     //
-                    Block blocks[] = cmd.getBlocks();
-                    DatanodeInfo xferTargets[][] = cmd.getTargets();
+                    BlockCommand cmd = namenode.sendHeartbeat(localName, 
+                            data.getCapacity(), data.getRemaining(), xmitsInProgress);
+                    //LOG.info("Just sent heartbeat, with name " + localName);
+                    lastHeartbeat = now;
+    
+                    if( cmd != null ) {
+                        data.checkDataDir();
+                        if (cmd.transferBlocks()) {
+                            //
+                            // Send a copy of a block to another datanode
+                            //
+                            Block blocks[] = cmd.getBlocks();
+                            DatanodeInfo xferTargets[][] = cmd.getTargets();
                         
-                    for (int i = 0; i < blocks.length; i++) {
-                        if (!data.isValidBlock(blocks[i])) {
-                            String errStr = "Can't send invalid block " + blocks[i];
-                            LOG.info(errStr);
-                            namenode.errorReport(localName, errStr);
-                            break;
-                        } else {
-                            if (xferTargets[i].length > 0) {
-                                LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
-                                new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
+                            for (int i = 0; i < blocks.length; i++) {
+                                if (!data.isValidBlock(blocks[i])) {
+                                    String errStr = "Can't send invalid block " + blocks[i];
+                                    LOG.info(errStr);
+                                    namenode.errorReport(
+                                        localName, 
+                                        DatanodeProtocol.INVALID_BLOCK, 
+                                        errStr);
+                                    break;
+                                } else {
+                                    if (xferTargets[i].length > 0) {
+                                        LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
+                                        new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
+                                    }
+                                }
                             }
-                        }
+                         } else if (cmd.invalidateBlocks()) {
+                            //
+                            // Some local block(s) are obsolete and can be 
+                            // safely garbage-collected.
+                            //
+                            data.invalidate(cmd.getBlocks());
+                         }
                     }
-                } else if (cmd != null && cmd.invalidateBlocks()) {
-                    //
-                    // Some local block(s) are obsolete and can be 
-                    // safely garbage-collected.
-                    //
-                    data.invalidate(cmd.getBlocks());
                 }
-            }
-            
-            // send block report
-            if (now - lastBlockReport > blockReportInterval) {
-                //
-                // Send latest blockinfo report if timer has expired.
-                // Get back a list of local block(s) that are obsolete
-                // and can be safely GC'ed.
-                //
-                Block toDelete[] = namenode.blockReport(localName, data.getBlockReport());
-                data.invalidate(toDelete);
-                lastBlockReport = now;
-                continue;
-            }
-            
-            // check if there are newly received blocks
-            Block [] blockArray=null;
-            synchronized( receivedBlockList ) {
-                if (receivedBlockList.size() > 0) {
-                    //
-                    // Send newly-received blockids to namenode
-                    //
-                    blockArray = (Block[]) receivedBlockList.toArray(new Block[receivedBlockList.size()]);
-                    receivedBlockList.removeAllElements();
+                
+                // send block report
+                if (now - lastBlockReport > blockReportInterval) {
+                    // before send block report, check if data directory is healthy
+                    data.checkDataDir();
+                    
+                     //
+                     // Send latest blockinfo report if timer has expired.
+                     // Get back a list of local block(s) that are obsolete
+                     // and can be safely GC'ed.
+                     //
+                     Block toDelete[] = namenode.blockReport(localName, data.getBlockReport());
+                     data.invalidate(toDelete);
+                     lastBlockReport = now;
+                     continue;
                 }
-            }
-            if( blockArray != null ) {
-                namenode.blockReceived(localName, blockArray);
-            }
-            
-            //
-            // There is no work to do;  sleep until hearbeat timer elapses, 
-            // or work arrives, and then iterate again.
-            //
-            long waitTime = HEARTBEAT_INTERVAL - (System.currentTimeMillis() - lastHeartbeat);
-            synchronized( receivedBlockList ) {
-                if (waitTime > 0 && receivedBlockList.size() == 0) {
-                    try {
-                        receivedBlockList.wait(waitTime);
-                    } catch (InterruptedException ie) {
+                
+                // check if there are newly received blocks
+                Block [] blockArray=null;
+                synchronized( receivedBlockList ) {
+                    if (receivedBlockList.size() > 0) {
+                        //
+                        // Send newly-received blockids to namenode
+                        //
+                        blockArray = (Block[]) receivedBlockList.toArray(new Block[receivedBlockList.size()]);
+                        receivedBlockList.removeAllElements();
                     }
                 }
-            } // synchronized
-        } // while (shouldRun)
+                if( blockArray != null ) {
+                    namenode.blockReceived(localName, blockArray);
+                }
+                
+                //
+                // There is no work to do;  sleep until hearbeat timer elapses, 
+                // or work arrives, and then iterate again.
+                //
+                long waitTime = HEARTBEAT_INTERVAL - (System.currentTimeMillis() - lastHeartbeat);
+                synchronized( receivedBlockList ) {
+                    if (waitTime > 0 && receivedBlockList.size() == 0) {
+                        try {
+                            receivedBlockList.wait(waitTime);
+                        } catch (InterruptedException ie) {
+                        }
+                    }
+                } // synchronized
+            } // while (shouldRun)
+        } catch(DiskErrorException e) {
+            handleDiskError(e.getMessage());
+        }
     } // offerService
 
     /**
@@ -276,9 +301,14 @@ public class DataNode implements FSConstants, Runnable {
                 while (shouldListen) {
                     Socket s = ss.accept();
                     //s.setSoTimeout(READ_TIMEOUT);
+                    data.checkDataDir();
                     new Daemon(new DataXceiver(s)).start();
                 }
                 ss.close();
+            } catch (DiskErrorException de ) {
+                String errMsgr = de.getMessage();
+                LOG.warning("Exiting DataXceiveServer due to "+ errMsgr );
+                handleDiskError(errMsgr);
             } catch (IOException ie) {
                 LOG.info("Exiting DataXceiveServer due to " + ie.toString());
             }
@@ -791,6 +821,7 @@ public class DataNode implements FSConstants, Runnable {
     }
   }
 
+
   /**
    * Make an instance of DataNode after ensuring that given data directory
    * (and parent directories, if necessary) can be created.
@@ -803,14 +834,14 @@ public class DataNode implements FSConstants, Runnable {
   static DataNode makeInstanceForDir(String dataDir, Configuration conf) throws IOException {
     DataNode dn = null;
     File data = new File(dataDir);
-    data.mkdirs();
-    if (!data.isDirectory()) {
-      LOG.warning("Can't start DataNode in non-directory: "+dataDir);
-      return null;
-    } else {
-      dn = new DataNode(conf, dataDir);
+    try {
+        DiskChecker.checkDir( data );
+        dn = new DataNode(conf, dataDir);
+        return dn;
+    } catch( DiskErrorException e ) {
+        LOG.warning("Can't start DataNode because " + e.getMessage() );
+        return null;
     }
-    return dn;
   }
 
   public String toString() {

+ 4 - 1
src/java/org/apache/hadoop/dfs/DatanodeProtocol.java

@@ -28,6 +28,9 @@ import java.io.*;
  * @author Michael Cafarella
  **********************************************************************/
 interface DatanodeProtocol {
+    // error code
+    final static int DISK_ERROR = 1;
+    final static int INVALID_BLOCK = 2;
     /**
      * sendHeartbeat() tells the NameNode that the DataNode is still
      * alive and well.  Includes some status info, too. 
@@ -60,5 +63,5 @@ interface DatanodeProtocol {
      * errorReport() tells the NameNode about something that has gone
      * awry.  Useful for debugging.
      */
-    public void errorReport(String sender, String msg) throws IOException;
+    public void errorReport(String sender, int errorCode, String msg) throws IOException;
 }

+ 29 - 1
src/java/org/apache/hadoop/dfs/FSDataset.java

@@ -19,6 +19,8 @@ import java.io.*;
 import java.util.*;
 
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.conf.*;
 
 /**************************************************
@@ -167,7 +169,22 @@ class FSDataset implements FSConstants {
             blkid = blkid >> ((15 - halfByteIndex) * 4);
             return (int) ((0x000000000000000F) & blkid);
         }
-
+        
+        /**
+         * check if a data diretory is healthy
+         * @throws DiskErrorException
+         * @author hairong
+         */
+        public void checkDirTree() throws DiskErrorException {
+            DiskChecker.checkDir(dir);
+            
+            if (children != null) {
+                for (int i = 0; i < children.length; i++) {
+                    children[i].checkDirTree();
+                }
+            }
+        }
+        
         public String toString() {
           return "FSDir{" +
               "dir=" + dir +
@@ -422,6 +439,17 @@ class FSDataset implements FSConstants {
         return new File(tmp, b.getBlockName());
     }
 
+    /**
+     * check if a data diretory is healthy
+     * @throws DiskErrorException
+     * @author hairong
+     */
+    void checkDataDir() throws DiskErrorException {
+        dirTree.checkDirTree();
+        DiskChecker.checkDir( tmp );
+    }
+    
+
     public String toString() {
       return "FSDataset{" +
         "dirpath='" + diskUsage.getDirPath() + "'" +

+ 40 - 19
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -1026,7 +1026,46 @@ class FSNamesystem implements FSConstants {
             }
         }
     }
+    
+    /**
+     * remove a datanode info
+     * @param name: datanode name
+     * @author hairong
+     */
 
+    synchronized void rmDataNodeByName( UTF8 name ) {
+        DatanodeInfo nodeInfo = (DatanodeInfo) datanodeMap.get(name);
+        if (nodeInfo != null) {
+            rmDataNode( nodeInfo );
+        } else {
+            NameNode.stateChangeLog.warning("BLOCK* NameSystem.rmDataNodeByName: "
+                    + nodeInfo.getName() + " does not exist");
+        }
+    }
+    
+    /**
+     * remove a datanode info
+     * @param nodeInfo: datanode info
+     * @author hairong
+     */
+    private synchronized void rmDataNode( DatanodeInfo nodeInfo ) {
+        heartbeats.remove( nodeInfo );
+        synchronized (datanodeMap) {
+            datanodeMap.remove(nodeInfo.getName());
+            NameNode.stateChangeLog.finer("BLOCK* NameSystem.heartbeatCheck: "
+                    + nodeInfo.getName() + " is removed from datanodeMap");
+        }
+        totalCapacity -= nodeInfo.getCapacity();
+        totalRemaining -= nodeInfo.getRemaining();
+
+        Block deadblocks[] = nodeInfo.getBlocks();
+        if (deadblocks != null) {
+            for (int i = 0; i < deadblocks.length; i++) {
+                removeStoredBlock(deadblocks[i], nodeInfo);
+            }
+        }
+    }
+        
     /**
      * Check if there are any expired heartbeats, and if so,
      * whether any blocks have to be re-replicated.
@@ -1038,27 +1077,9 @@ class FSNamesystem implements FSConstants {
             while ((heartbeats.size() > 0) &&
                    ((nodeInfo = (DatanodeInfo) heartbeats.first()) != null) &&
                    (nodeInfo.lastUpdate() < System.currentTimeMillis() - EXPIRE_INTERVAL)) {
-                heartbeats.remove(nodeInfo);
                 NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: "
                            + "lost heartbeat from " + nodeInfo.getName());
-                synchronized (datanodeMap) {
-                    datanodeMap.remove(nodeInfo.getName());
-                    NameNode.stateChangeLog.finer("BLOCK* NameSystem.heartbeatCheck: "
-                            + nodeInfo.getName() + " is removed from datanodeMap");
-                }
-                totalCapacity -= nodeInfo.getCapacity();
-                totalRemaining -= nodeInfo.getRemaining();
-
-                Block deadblocks[] = nodeInfo.getBlocks();
-                if (deadblocks != null) {
-                    for (int i = 0; i < deadblocks.length; i++) {
-                        removeStoredBlock(deadblocks[i], nodeInfo);
-                    }
-                }
-
-                if (heartbeats.size() > 0) {
-                    nodeInfo = (DatanodeInfo) heartbeats.first();
-                }
+                rmDataNode(nodeInfo);
             }
         }
     }

+ 6 - 2
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -445,9 +445,13 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
 
     /**
      */
-    public void errorReport(String sender, String msg) {
+    public void errorReport(String sender, int errorCode, String msg) {
         // Log error message from datanode
-        //LOG.info("Report from " + sender + ": " + msg);
+        LOG.warning("Report from " + sender + ": " + msg);
+        if( errorCode == DatanodeProtocol.DISK_ERROR ) {
+            namesystem.rmDataNodeByName(new UTF8(sender));            
+        }
+            
     }
 
     /**

+ 37 - 0
src/java/org/apache/hadoop/util/DiskChecker.java

@@ -0,0 +1,37 @@
+package org.apache.hadoop.util;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Class that provides utility functions for checking disk problem
+ * @author Hairong Kuang
+ */
+
+public class DiskChecker {
+
+    public static class DiskErrorException extends IOException {
+      DiskErrorException(String msg) {
+        super(msg);
+      }
+    }
+    
+    public static void checkDir( File dir ) throws DiskErrorException {
+        if( !dir.exists() && !dir.mkdirs() )
+            throw new DiskErrorException( "can not create directory: " 
+                    + dir.toString() );
+        
+        if ( !dir.isDirectory() )
+            throw new DiskErrorException( "not a directory: " 
+                    + dir.toString() );
+            
+        if( !dir.canRead() )
+            throw new DiskErrorException( "directory is not readable: " 
+                    + dir.toString() );
+            
+        if( !dir.canWrite() )
+            throw new DiskErrorException( "directory is not writable: " 
+                    + dir.toString() );
+    }
+
+}