Bläddra i källkod

HADOOP-124. Change DFS so that datanodes are identified by a persistent ID rather than by host and port. Contributed by Konstantin.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@410635 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 år sedan
förälder
incheckning
8fe5a30ca9

+ 5 - 0
CHANGES.txt

@@ -85,6 +85,11 @@ Trunk (unreleased)
     is closed, rather than as each block is written.
     (Milind Bhandarkar via cutting)
 
+23. HADOOP-124. Change DFS so that datanodes are identified by a
+    persistent ID rather than by host and port.  This solves a number
+    of filesystem integrity problems, when, e.g., datanodes are
+    restarted.  (Konstantin Shvachko via cutting)
+
 
 Release 0.2.1 - 2006-05-12
 

+ 14 - 0
src/java/org/apache/hadoop/dfs/AlreadyBeingCreatedException.java

@@ -0,0 +1,14 @@
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+
+/**
+ * The exception that happens when you ask to create a file that already
+ * is being created, but is not closed yet.
+ * @author Owen O'Malley
+ */
+public class AlreadyBeingCreatedException extends IOException {
+  public AlreadyBeingCreatedException(String msg) {
+    super(msg);
+  }
+}

+ 14 - 1
src/java/org/apache/hadoop/dfs/BlockCommand.java

@@ -40,12 +40,14 @@ class BlockCommand implements Writable {
   
     boolean transferBlocks = false;
     boolean invalidateBlocks = false;
+    boolean shutdown = false;
     Block blocks[];
     DatanodeInfo targets[][];
 
     public BlockCommand() {
         this.transferBlocks = false;
-        this.invalidateBlocks = false;        
+        this.invalidateBlocks = false;
+        this.shutdown = false;
         this.blocks = new Block[0];
         this.targets = new DatanodeInfo[0][];
     }
@@ -53,6 +55,7 @@ class BlockCommand implements Writable {
     public BlockCommand(Block blocks[], DatanodeInfo targets[][]) {
         this.transferBlocks = true;
         this.invalidateBlocks = false;
+        this.shutdown = false;
         this.blocks = blocks;
         this.targets = targets;
     }
@@ -60,10 +63,16 @@ class BlockCommand implements Writable {
     public BlockCommand(Block blocks[]) {
         this.transferBlocks = false;
         this.invalidateBlocks = true;
+        this.shutdown = false;
         this.blocks = blocks;
         this.targets = new DatanodeInfo[0][];
     }
 
+    public BlockCommand( boolean doShutdown ) {
+      this();
+      this.shutdown = doShutdown;
+    }
+
     public boolean transferBlocks() {
         return transferBlocks;
     }
@@ -72,6 +81,10 @@ class BlockCommand implements Writable {
         return invalidateBlocks;
     }
     
+    public boolean shutdownNode() {
+      return shutdown;
+  }
+  
     public Block[] getBlocks() {
         return blocks;
     }

+ 6 - 10
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -375,11 +375,7 @@ class DFSClient implements FSConstants {
             if (deadNodes.contains(nodes[i])) {
                 continue;
             }
-            String nodename = nodes[i].getName().toString();
-            int colon = nodename.indexOf(':');
-            if (colon >= 0) {
-                nodename = nodename.substring(0, colon);
-            }
+            String nodename = nodes[i].getHost();
             if (localName.equals(nodename)) {
                 chosenNode = nodes[i];
                 break;
@@ -524,7 +520,7 @@ class DFSClient implements FSConstants {
 
                 try {
                     chosenNode = bestNode(nodes[targetBlock], deadNodes);
-                    targetAddr = DataNode.createSocketAddr(chosenNode.getName().toString());
+                    targetAddr = DataNode.createSocketAddr(chosenNode.getName());
                 } catch (IOException ie) {
                     String blockInfo =
                       blocks[targetBlock]+" file="+src+" offset="+target;
@@ -758,12 +754,12 @@ class DFSClient implements FSConstants {
                 //
                 // Connect to first DataNode in the list.  Abort if this fails.
                 //
-                InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName().toString());
+                InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName());
                 try {
                     s = new Socket();
                     s.connect(target, READ_TIMEOUT);
                     s.setSoTimeout(replication * READ_TIMEOUT);
-                    datanodeName = nodes[0].getName().toString();
+                    datanodeName = nodes[0].getName();
                 } catch (IOException ie) {
                     // Connection failed.  Let's wait a little bit and retry
                     try {
@@ -811,7 +807,7 @@ class DFSClient implements FSConstants {
                     localName, overwrite, replication, blockSize);
               } catch (RemoteException e) {
                 if (--retries == 0 || 
-                    "org.apache.hadoop.dfs.NameNode.AlreadyBeingCreatedException".
+                    AlreadyBeingCreatedException.class.getName().
                         equals(e.getClassName())) {
                   throw e;
                 } else {
@@ -841,7 +837,7 @@ class DFSClient implements FSConstants {
                                          clientName.toString());
               } catch (RemoteException e) {
                 if (--retries == 0 || 
-                    "org.apache.hadoop.dfs.NameNode.NotReplicatedYetException".
+                    NotReplicatedYetException.class.getName().
                         equals(e.getClassName())) {
                   throw e;
                 } else {

+ 2 - 2
src/java/org/apache/hadoop/dfs/DFSck.java

@@ -266,7 +266,7 @@ public class DFSck {
 
         try {
             chosenNode = bestNode(lblock.getLocations(), deadNodes);
-            targetAddr = DataNode.createSocketAddr(chosenNode.getName().toString());
+            targetAddr = DataNode.createSocketAddr(chosenNode.getName());
         } catch (IOException ie) {
             if (failures >= DFSClient.MAX_BLOCK_ACQUIRE_FAILURES) {
                 throw new IOException("Could not obtain block " + lblock);
@@ -358,7 +358,7 @@ public class DFSck {
           if (deadNodes.contains(nodes[i])) {
               continue;
           }
-          String nodename = nodes[i].getName().toString();
+          String nodename = nodes[i].getName();
           int colon = nodename.indexOf(':');
           if (colon >= 0) {
               nodename = nodename.substring(0, colon);

+ 192 - 143
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -22,6 +22,7 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 import java.io.*;
 import java.net.*;
+import java.nio.channels.FileLock;
 import java.util.*;
 import java.util.logging.*;
 
@@ -59,7 +60,7 @@ import java.util.logging.*;
  **********************************************************/
 public class DataNode implements FSConstants, Runnable {
     public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.dfs.DataNode");
-  //
+    //
     // REMIND - mjc - I might bring "maxgigs" back so user can place 
     // artificial  limit on space
     //private static final long GIGABYTE = 1024 * 1024 * 1024;
@@ -87,12 +88,13 @@ public class DataNode implements FSConstants, Runnable {
     private static Vector subThreadList = null;
     DatanodeProtocol namenode;
     FSDataset data;
-    String localName;
+    DatanodeRegistration dnRegistration;
     boolean shouldRun = true;
     Vector receivedBlockList = new Vector();
     int xmitsInProgress = 0;
     Daemon dataXceiveServer = null;
     long blockReportInterval;
+    private DataStorage storage = null;
 
     /**
      * Create the DataNode given a configuration and a dataDir.
@@ -102,43 +104,80 @@ public class DataNode implements FSConstants, Runnable {
         this(InetAddress.getLocalHost().getHostName(), 
              new File(datadir),
              createSocketAddr(conf.get("fs.default.name", "local")), conf);
+        // register datanode
+        register();
     }
 
     /**
      * A DataNode can also be created with configuration information
      * explicitly given.
+     * 
+     * @see DataStorage
      */
-    public DataNode(String machineName, File datadir, InetSocketAddress nameNodeAddr, Configuration conf) throws IOException {
-        this.namenode = (DatanodeProtocol) RPC.getProxy(DatanodeProtocol.class, nameNodeAddr, conf);
-        this.data = new FSDataset(datadir, conf);
-
-        ServerSocket ss = null;
-        int tmpPort = conf.getInt("dfs.datanode.port", 50010);
-        while (ss == null) {
-            try {
-                ss = new ServerSocket(tmpPort);
-                LOG.info("Opened server at " + tmpPort);
-            } catch (IOException ie) {
-                LOG.info("Could not open server at " + tmpPort + ", trying new port");
-                tmpPort++;
-            }
+    private DataNode(String machineName, 
+                    File datadir, 
+                    InetSocketAddress nameNodeAddr, 
+                    Configuration conf ) throws IOException {
+      // get storage info and lock the data dir
+      storage = new DataStorage( datadir );
+      // connect to name node
+      this.namenode = (DatanodeProtocol) RPC.getProxy(DatanodeProtocol.class, 
+                                                      nameNodeAddr, 
+                                                      conf);
+      // find free port
+      ServerSocket ss = null;
+      int tmpPort = conf.getInt("dfs.datanode.port", 50010);
+      while (ss == null) {
+        try {
+          ss = new ServerSocket(tmpPort);
+          LOG.info("Opened server at " + tmpPort);
+        } catch (IOException ie) {
+          LOG.info("Could not open server at " + tmpPort + ", trying new port");
+          tmpPort++;
         }
-        this.localName = machineName + ":" + tmpPort;
-        this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
-        this.dataXceiveServer.start();
-
-        long blockReportIntervalBasis =
-          conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
-        this.blockReportInterval =
-          blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));
+      }
+      // construct registration
+      this.dnRegistration = new DatanodeRegistration(
+                                        DFS_CURRENT_VERSION, 
+                                        machineName + ":" + tmpPort, 
+                                        storage.getStorageID(),
+                                        "" );
+      // initialize data node internal structure
+      this.data = new FSDataset(datadir, conf);
+      this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
+      this.dataXceiveServer.start();
+
+      long blockReportIntervalBasis =
+        conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
+      this.blockReportInterval =
+        blockReportIntervalBasis - new Random().nextInt((int)(blockReportIntervalBasis/10));
     }
 
     /**
      * Return the namenode's identifier
      */
     public String getNamenode() {
-        //return namenode.toString();
-	return "<namenode>";
+      //return namenode.toString();
+      return "<namenode>";
+    }
+
+    /**
+     * Register datanode
+     * <p>
+     * The datanode needs to register with the namenode on startup in order
+     * 1) to report which storage it is serving now and 
+     * 2) to receive a registrationID 
+     * issued by the namenode to recognize registered datanodes.
+     * 
+     * @see FSNamesystem#registerDatanode(DatanodeRegistration)
+     * @throws IOException
+     */
+    private void register() throws IOException {
+      dnRegistration = namenode.register( dnRegistration );
+      if( storage.getStorageID().equals("") ) {
+        storage.setStorageID( dnRegistration.getStorageID());
+        storage.write();
+      }
     }
 
     /**
@@ -152,13 +191,17 @@ public class DataNode implements FSConstants, Runnable {
             this.dataXceiveServer.join();
         } catch (InterruptedException ie) {
         }
+        try {
+          this.storage.close();
+        } catch (IOException ie) {
+        }
     }
 
     void handleDiskError( String errMsgr ) {
         LOG.warning( "Shuting down DataNode because "+errMsgr );
         try {
             namenode.errorReport(
-                    localName, DatanodeProtocol.DISK_ERROR, errMsgr);
+                    dnRegistration, DatanodeProtocol.DISK_ERROR, errMsgr);
         } catch( IOException ignored) {              
         }
         shutdown();
@@ -169,116 +212,122 @@ public class DataNode implements FSConstants, Runnable {
      * forever calling remote NameNode functions.
      */
     public void offerService() throws Exception {
-        long lastHeartbeat = 0, lastBlockReport = 0;
-        LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");
+      long lastHeartbeat = 0, lastBlockReport = 0;
+      LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec");
 
-        //
-        // Now loop for a long time....
-        //
+      //
+      // Now loop for a long time....
+      //
 
-        try {
-            while (shouldRun) {
-                long now = System.currentTimeMillis();
-    
+      try {
+        while (shouldRun) {
+          long now = System.currentTimeMillis();
+
+          //
+          // Every so often, send heartbeat or block-report
+          //
+          if (now - lastHeartbeat > HEARTBEAT_INTERVAL) {
+            //
+            // All heartbeat messages include following info:
+            // -- Datanode name
+            // -- data transfer port
+            // -- Total capacity
+            // -- Bytes remaining
+            //
+            BlockCommand cmd = namenode.sendHeartbeat(dnRegistration, 
+                                                      data.getCapacity(), 
+                                                      data.getRemaining(), 
+                                                      xmitsInProgress);
+            //LOG.info("Just sent heartbeat, with name " + localName);
+            lastHeartbeat = now;
+
+            if( cmd != null ) {
+              data.checkDataDir();
+              if (cmd.transferBlocks()) {
                 //
-                // Every so often, send heartbeat or block-report
+                // Send a copy of a block to another datanode
                 //
-                if (now - lastHeartbeat > HEARTBEAT_INTERVAL) {
-                    //
-                    // All heartbeat messages include following info:
-                    // -- Datanode name
-                    // -- data transfer port
-                    // -- Total capacity
-                    // -- Bytes remaining
-                    //
-                    BlockCommand cmd = namenode.sendHeartbeat(localName, 
-                            data.getCapacity(), data.getRemaining(), xmitsInProgress);
-                    //LOG.info("Just sent heartbeat, with name " + localName);
-                    lastHeartbeat = now;
-    
-                    if( cmd != null ) {
-                        data.checkDataDir();
-                        if (cmd.transferBlocks()) {
-                            //
-                            // Send a copy of a block to another datanode
-                            //
-                            Block blocks[] = cmd.getBlocks();
-                            DatanodeInfo xferTargets[][] = cmd.getTargets();
-                        
-                            for (int i = 0; i < blocks.length; i++) {
-                                if (!data.isValidBlock(blocks[i])) {
-                                    String errStr = "Can't send invalid block " + blocks[i];
-                                    LOG.info(errStr);
-                                    namenode.errorReport(
-                                        localName, 
-                                        DatanodeProtocol.INVALID_BLOCK, 
-                                        errStr);
-                                    break;
-                                } else {
-                                    if (xferTargets[i].length > 0) {
-                                        LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
-                                        new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
-                                    }
-                                }
-                            }
-                         } else if (cmd.invalidateBlocks()) {
-                            //
-                            // Some local block(s) are obsolete and can be 
-                            // safely garbage-collected.
-                            //
-                            data.invalidate(cmd.getBlocks());
-                         }
-                    }
-                }
-                
-                // send block report
-                if (now - lastBlockReport > blockReportInterval) {
-                    // before send block report, check if data directory is healthy
-                    data.checkDataDir();
+                Block blocks[] = cmd.getBlocks();
+                DatanodeInfo xferTargets[][] = cmd.getTargets();
                     
-                     //
-                     // Send latest blockinfo report if timer has expired.
-                     // Get back a list of local block(s) that are obsolete
-                     // and can be safely GC'ed.
-                     //
-                     Block toDelete[] = namenode.blockReport(localName, data.getBlockReport());
-                     data.invalidate(toDelete);
-                     lastBlockReport = now;
-                     continue;
-                }
-                
-                // check if there are newly received blocks
-                Block [] blockArray=null;
-                synchronized( receivedBlockList ) {
-                    if (receivedBlockList.size() > 0) {
-                        //
-                        // Send newly-received blockids to namenode
-                        //
-                        blockArray = (Block[]) receivedBlockList.toArray(new Block[receivedBlockList.size()]);
-                        receivedBlockList.removeAllElements();
+                for (int i = 0; i < blocks.length; i++) {
+                  if (!data.isValidBlock(blocks[i])) {
+                    String errStr = "Can't send invalid block " + blocks[i];
+                    LOG.info(errStr);
+                    namenode.errorReport( dnRegistration, 
+                                DatanodeProtocol.INVALID_BLOCK, 
+                                errStr);
+                    break;
+                  } else {
+                    if (xferTargets[i].length > 0) {
+                        LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
+                        new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
                     }
+                  }
                 }
-                if( blockArray != null ) {
-                    namenode.blockReceived(localName, blockArray);
-                }
-                
+              } else if (cmd.invalidateBlocks()) {
                 //
-                // There is no work to do;  sleep until hearbeat timer elapses, 
-                // or work arrives, and then iterate again.
+                // Some local block(s) are obsolete and can be 
+                // safely garbage-collected.
                 //
-                long waitTime = HEARTBEAT_INTERVAL - (System.currentTimeMillis() - lastHeartbeat);
-                synchronized( receivedBlockList ) {
-                    if (waitTime > 0 && receivedBlockList.size() == 0) {
-                        try {
-                            receivedBlockList.wait(waitTime);
-                        } catch (InterruptedException ie) {
-                        }
-                    }
-                } // synchronized
-            } // while (shouldRun)
-        } catch(DiskErrorException e) {
-            handleDiskError(e.getMessage());
-        }
+                data.invalidate(cmd.getBlocks());
+              } else if( cmd.shutdownNode()) {
+                // shut down the data node
+                this.shutdown();
+                continue;
+              }
+            }
+          }
+            
+          // send block report
+          if (now - lastBlockReport > blockReportInterval) {
+            // before send block report, check if data directory is healthy
+            data.checkDataDir();
+                
+            //
+            // Send latest blockinfo report if timer has expired.
+            // Get back a list of local block(s) that are obsolete
+            // and can be safely GC'ed.
+            //
+            Block toDelete[] = namenode.blockReport(dnRegistration,
+                                                    data.getBlockReport());
+            data.invalidate(toDelete);
+            lastBlockReport = now;
+            continue;
+          }
+            
+          // check if there are newly received blocks
+          Block [] blockArray=null;
+          synchronized( receivedBlockList ) {
+            if (receivedBlockList.size() > 0) {
+              //
+              // Send newly-received blockids to namenode
+              //
+              blockArray = (Block[]) receivedBlockList.toArray(new Block[receivedBlockList.size()]);
+              receivedBlockList.removeAllElements();
+            }
+          }
+          if( blockArray != null ) {
+            namenode.blockReceived( dnRegistration, blockArray );
+          }
+            
+          //
+          // There is no work to do;  sleep until hearbeat timer elapses, 
+          // or work arrives, and then iterate again.
+          //
+          long waitTime = HEARTBEAT_INTERVAL - (System.currentTimeMillis() - lastHeartbeat);
+          synchronized( receivedBlockList ) {
+            if (waitTime > 0 && receivedBlockList.size() == 0) {
+              try {
+                receivedBlockList.wait(waitTime);
+              } catch (InterruptedException ie) {
+              }
+            }
+          } // synchronized
+        } // while (shouldRun)
+      } catch(DiskErrorException e) {
+        handleDiskError(e.getMessage());
+      }
     } // offerService
 
     /**
@@ -503,7 +552,7 @@ public class DataNode implements FSConstants, Runnable {
               DataOutputStream out2 = null;
               if (targets.length > 1) {
                 // Connect to backup machine
-                mirrorNode = targets[1].getName().toString();
+                mirrorNode = targets[1].getName();
                 mirrorTarget = createSocketAddr(mirrorNode);
                 try {
                   Socket s2 = new Socket();
@@ -699,7 +748,7 @@ public class DataNode implements FSConstants, Runnable {
          * entire target list, the block, and the data.
          */
         public DataTransfer(DatanodeInfo targets[], Block b) throws IOException {
-            this.curTarget = createSocketAddr(targets[0].getName().toString());
+            this.curTarget = createSocketAddr(targets[0].getName());
             this.targets = targets;
             this.b = b;
             this.buf = new byte[BUFFER_SIZE];
@@ -709,7 +758,7 @@ public class DataNode implements FSConstants, Runnable {
          * Do the deed, write the bytes
          */
         public void run() {
-	    xmitsInProgress++;
+      xmitsInProgress++;
             try {
                 Socket s = new Socket();
                 s.connect(curTarget, READ_TIMEOUT);
@@ -750,8 +799,8 @@ public class DataNode implements FSConstants, Runnable {
             } catch (IOException ie) {
               LOG.log(Level.WARNING, "Failed to transfer "+b+" to "+curTarget, ie);
             } finally {
-		xmitsInProgress--;
-	    }
+    xmitsInProgress--;
+      }
         }
     }
 
@@ -821,7 +870,6 @@ public class DataNode implements FSConstants, Runnable {
     }
   }
 
-
   /**
    * Make an instance of DataNode after ensuring that given data directory
    * (and parent directories, if necessary) can be created.
@@ -847,17 +895,18 @@ public class DataNode implements FSConstants, Runnable {
   public String toString() {
     return "DataNode{" +
         "data=" + data +
-        ", localName='" + localName + "'" +
+        ", localName='" + dnRegistration.getName() + "'" +
+        ", storageID='" + dnRegistration.getStorageID() + "'" +
         ", xmitsInProgress=" + xmitsInProgress +
         "}";
   }
 
-    /**
-     */
-    public static void main(String args[]) throws IOException {
-        Configuration conf = new Configuration();
-        LogFormatter.setShowThreadIDs(true);
-        LogFormatter.initFileHandler(conf, "datanode");
-        runAndWait(conf);
-    }
+  /**
+   */
+  public static void main(String args[]) throws IOException {
+    Configuration conf = new Configuration();
+    LogFormatter.setShowThreadIDs(true);
+    LogFormatter.initFileHandler(conf, "datanode");
+    runAndWait(conf);
+  }
 }

+ 148 - 0
src/java/org/apache/hadoop/dfs/DataStorage.java

@@ -0,0 +1,148 @@
+package org.apache.hadoop.dfs;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileLock;
+
+import org.apache.hadoop.io.UTF8;
+
+/** 
+ * Data storage information file.
+ * <p>
+ * During startup the datanode reads its data storage file.
+ * The data storage file is stored in the dfs.data.dir directory.
+ * It contains version and storageID.
+ * Datanode holds a lock on the dataStorage file while it runs so that other 
+ * datanodes were not able to start working with the same data storage.
+ * The lock is released when the datanode stops (normally or abnormally).
+ * 
+ * @author Konstantin Shvachko
+ */
+class DataStorage {
+  public static final String STORAGE_INFO_FILE_NAME = "storage";
+
+  // persistent fields
+  private int version = 0;  /// stored version
+  private String storageID; /// unique per cluster storageID
+  
+  // non persistent fields
+  private RandomAccessFile storageFile = null;
+  private FileLock storageLock = null;
+  
+  /**
+   * Create DataStorage and verify its version.
+   * 
+   * @param datadir data storage directory
+   * @throws IOException
+   */
+  public DataStorage( File datadir ) throws IOException {
+    this( DataNode.DFS_CURRENT_VERSION, datadir );
+    
+    if( version != DataNode.DFS_CURRENT_VERSION )
+      throw new IncorrectVersionException( version, "data storage" );
+  }
+  
+  /**
+   * Create DataStorage.
+   * 
+   * Read data storage file if exists or create it if not.
+   * Lock the file.
+   * 
+   * @param curVersion can be used to read file saved with a previous version.
+   * @param datadir data storage directory
+   * @throws IOException
+   */
+  public DataStorage( int curVersion, File datadir ) throws IOException {
+    this.version = curVersion;
+    storageFile = new RandomAccessFile( 
+                        new File(datadir, STORAGE_INFO_FILE_NAME ), 
+                        "rws" );
+    lock();
+    boolean needToSave;
+    try {
+      needToSave = read();
+    } catch( java.io.EOFException e ) {
+      storageID = "";
+      needToSave = true;
+    }
+    
+    if( needToSave )
+      write();
+  }
+  
+  public int getVersion() {
+    return version;
+  }
+
+  public String getStorageID() {
+    return storageID;
+  }
+  
+  public void setStorageID( String newStorageID ) {
+    this.storageID = newStorageID;
+  }
+  
+  public void setVersion( int newVersion ) {
+    this.version = newVersion;
+  }
+  
+  /**
+   * Lock datastoarge file.
+   * 
+   * @throws IOException
+   */
+  public void lock() throws IOException {
+    storageLock = storageFile.getChannel().tryLock();
+    if( storageLock == null )
+      throw new IOException( "Cannot start multiple Datanode instances "
+                              + "sharing the same data directory.\n" 
+                              + STORAGE_INFO_FILE_NAME + " is locked. ");
+  }
+  
+  /**
+   * Unlock datastoarge file.
+   * 
+   * @throws IOException
+   */
+  public void unlock() throws IOException {
+    storageLock.release();
+  }
+  
+  /**
+   * Close datastoarge file.
+   * 
+   * @throws IOException
+   */
+  public void close() throws IOException {
+    storageLock.release();
+    storageFile.close();
+  }
+  
+  /**
+   * Read data storage file.
+   * 
+   * @return whether the data storage file need to be updated.
+   * @throws IOException
+   */
+  public boolean read() throws IOException {
+    storageFile.seek(0);
+    this.version = storageFile.readInt();
+    UTF8 uID = new UTF8();
+    uID.readFields( storageFile );
+    this.storageID = uID.toString();
+    return false;
+  }
+
+  /**
+   * Write data storage file.
+   * 
+   * @throws IOException
+   */
+  public void write() throws IOException {
+    storageFile.seek(0);
+    storageFile.writeInt( this.version );
+    UTF8 uID = new UTF8( this.storageID );
+    uID.write( storageFile );
+  }
+}

+ 55 - 0
src/java/org/apache/hadoop/dfs/DatanodeID.java

@@ -0,0 +1,55 @@
+package org.apache.hadoop.dfs;
+
+/**
+ * DatanodeID is composed of the data node 
+ * name (hostname:portNumber) and the data storage ID, 
+ * which it currently represents.
+ * 
+ * @author Konstantin Shvachko
+ */
+class DatanodeID {
+
+  protected String name;      /// hostname:portNumber
+  protected String storageID; /// unique per cluster storageID
+  
+  /**
+   * Create DatanodeID
+   * 
+   * @param nodeName (hostname:portNumber) 
+   * @param storageID data storage ID
+   */
+  public DatanodeID( String nodeName, String storageID ) {
+    this.name = nodeName;
+    this.storageID = storageID;
+  }
+  
+  /**
+   * @return hostname:portNumber.
+   */
+  public String getName() {
+    return name;
+  }
+  
+  /**
+   * @return data storage ID.
+   */
+  public String getStorageID() {
+    return this.storageID;
+  }
+
+  /**
+   * @return hostname and no :portNumber.
+   */
+  public String getHost() {
+    int colon = name.indexOf(":");
+    if (colon < 0) {
+      return name;
+    } else {
+      return name.substring(0, colon);
+    }
+  }
+  
+  public String toString() {
+    return name;
+  }
+}

+ 25 - 37
src/java/org/apache/hadoop/dfs/DatanodeInfo.java

@@ -27,7 +27,7 @@ import java.util.*;
  *
  * @author Mike Cafarella
  **************************************************/
-class DatanodeInfo implements Writable, Comparable {
+public class DatanodeInfo extends DatanodeID implements Writable, Comparable {
 
     static {                                      // register a ctor
       WritableFactories.setFactory
@@ -37,30 +37,36 @@ class DatanodeInfo implements Writable, Comparable {
          });
     }
 
-    private UTF8 name;
     private long capacityBytes, remainingBytes, lastUpdate;
     private volatile TreeSet blocks;
 
     /** Create an empty DatanodeInfo.
      */
     public DatanodeInfo() {
-        this(new UTF8(), 0, 0);
+        this(new String(), new String(), 0, 0);
     }
 
+    public DatanodeInfo( DatanodeID nodeID ) {
+      this( nodeID.getName(), nodeID.getStorageID(), 0, 0);
+    }
+    
    /**
-    * @param name hostname:portNumber as UTF8 object.
+    * Create an empty DatanodeInfo.
     */
-    public DatanodeInfo(UTF8 name) {
-        this.name = name;
-        this.blocks = new TreeSet();
-        updateHeartbeat(0, 0);        
+    public DatanodeInfo(DatanodeID nodeID, 
+                        long capacity, 
+                        long remaining) {
+      this( nodeID.getName(), nodeID.getStorageID(), capacity, remaining );
     }
 
    /**
-    * @param name hostname:portNumber as UTF8 object.
+    * @param name hostname:portNumber as String object.
     */
-    public DatanodeInfo(UTF8 name, long capacity, long remaining) {
-        this.name = name;
+    public DatanodeInfo(String name, 
+                        String storageID, 
+                        long capacity, 
+                        long remaining) {
+        super( name, storageID );
         this.blocks = new TreeSet();
         updateHeartbeat(capacity, remaining);
     }
@@ -88,28 +94,6 @@ class DatanodeInfo implements Writable, Comparable {
         this.lastUpdate = System.currentTimeMillis();
     }
 
-    /**
-     * @return hostname:portNumber as UTF8 object.
-     */
-    public UTF8 getName() {
-        return name;
-    }
-
-    /**
-     * @return hostname and no :portNumber as UTF8 object.
-     */
-    public UTF8 getHost() {
-        String nameStr = name.toString();
-        int colon = nameStr.indexOf(":");
-        if (colon < 0) {
-            return name;
-        } else {
-            return new UTF8(nameStr.substring(0, colon));
-        }
-    }
-    public String toString() {
-        return name.toString();
-    }
     public Block[] getBlocks() {
         return (Block[]) blocks.toArray(new Block[blocks.size()]);
     }
@@ -127,7 +111,7 @@ class DatanodeInfo implements Writable, Comparable {
     }
 
   /** Comparable.
-   * Basis of compare is the UTF8 name (host:portNumber) only.
+   * Basis of compare is the String name (host:portNumber) only.
    * @param o
    * @return as specified by Comparable.
    */
@@ -142,7 +126,8 @@ class DatanodeInfo implements Writable, Comparable {
     /**
      */
     public void write(DataOutput out) throws IOException {
-        name.write(out);
+        new UTF8( this.name ).write(out);
+        new UTF8( this.storageID ).write(out);
         out.writeLong(capacityBytes);
         out.writeLong(remainingBytes);
         out.writeLong(lastUpdate);
@@ -158,8 +143,11 @@ class DatanodeInfo implements Writable, Comparable {
     /**
      */
     public void readFields(DataInput in) throws IOException {
-        this.name = new UTF8();
-        this.name.readFields(in);
+        UTF8 uStr = new UTF8();
+        uStr.readFields(in);
+        this.name = uStr.toString();
+        uStr.readFields(in);
+        this.storageID = uStr.toString();
         this.capacityBytes = in.readLong();
         this.remainingBytes = in.readLong();
         this.lastUpdate = in.readLong();

+ 25 - 9
src/java/org/apache/hadoop/dfs/DatanodeProtocol.java

@@ -28,9 +28,21 @@ import java.io.*;
  * @author Michael Cafarella
  **********************************************************************/
 interface DatanodeProtocol {
-    // error code
-    final static int DISK_ERROR = 1;
-    final static int INVALID_BLOCK = 2;
+  // error code
+  final static int DISK_ERROR = 1;
+  final static int INVALID_BLOCK = 2;
+  /** 
+   * Register Datanode.
+   *
+   * @see DataNode#register()
+   * @see FSNamesystem#registerDatanode(DatanodeRegistration)
+   * 
+   * @return updated {@link DatanodeRegistration}, which contains 
+   * new storageID if the datanode did not have one and
+   * registration ID for further communication.
+   */
+    public DatanodeRegistration register( DatanodeRegistration registration
+                                        ) throws IOException;
     /**
      * sendHeartbeat() tells the NameNode that the DataNode is still
      * alive and well.  Includes some status info, too. 
@@ -38,9 +50,9 @@ interface DatanodeProtocol {
      * A BlockCommand tells the DataNode to invalidate local block(s), 
      * or to copy them to other DataNodes, etc.
      */
-    public BlockCommand sendHeartbeat(String sender, 
-            long capacity, long remaining,
-            int xmitsInProgress) throws IOException;
+    public BlockCommand sendHeartbeat(DatanodeRegistration registration,
+                                      long capacity, long remaining,
+                                      int xmitsInProgress) throws IOException;
 
     /**
      * blockReport() tells the NameNode about all the locally-stored blocks.
@@ -49,7 +61,8 @@ interface DatanodeProtocol {
      * the locally-stored blocks.  It's invoked upon startup and then
      * infrequently afterwards.
      */
-    public Block[] blockReport(String sender, Block blocks[]) throws IOException;
+    public Block[] blockReport( DatanodeRegistration registration,
+                                Block blocks[]) throws IOException;
     
     /**
      * blockReceived() allows the DataNode to tell the NameNode about
@@ -57,11 +70,14 @@ interface DatanodeProtocol {
      * writes a new Block here, or another DataNode copies a Block to
      * this DataNode, it will call blockReceived().
      */
-    public void blockReceived(String sender, Block blocks[]) throws IOException;
+    public void blockReceived(DatanodeRegistration registration,
+                              Block blocks[]) throws IOException;
 
     /**
      * errorReport() tells the NameNode about something that has gone
      * awry.  Useful for debugging.
      */
-    public void errorReport(String sender, int errorCode, String msg) throws IOException;
+    public void errorReport(DatanodeRegistration registration,
+                            int errorCode, 
+                            String msg) throws IOException;
 }

+ 87 - 0
src/java/org/apache/hadoop/dfs/DatanodeRegistration.java

@@ -0,0 +1,87 @@
+package org.apache.hadoop.dfs;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/** 
+ * DatanodeRegistration class conatins all information the Namenode needs
+ * to identify and verify a Datanode when it contacts the Namenode.
+ * This information is sent by Datanode with each communication request.
+ * 
+ * @author Konstantin Shvachko
+ */
+class DatanodeRegistration extends DatanodeID implements Writable {
+  static {                                      // register a ctor
+    WritableFactories.setFactory
+      (DatanodeRegistration.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new DatanodeRegistration(); }
+       });
+  }
+
+  int version;            /// current Datanode version
+  String registrationID;  /// a unique per namenode id; indicates   
+                          /// the namenode the datanode is registered with
+
+  /**
+   * Default constructor.
+   */
+  public DatanodeRegistration() {
+    this( 0, null, null, null );
+  }
+  
+  /**
+   * Create DatanodeRegistration
+   */
+  public DatanodeRegistration(int version, 
+                              String nodeName, 
+                              String storageID,
+                              String registrationID ) {
+    super(nodeName, storageID);
+    this.version = version;
+    this.registrationID = registrationID;
+  }
+
+  /**
+   */
+  public int getVersion() {
+    return version;
+  }
+  
+  /**
+   */
+  public String getRegistrationID() {
+    return registrationID;
+  }
+
+  /////////////////////////////////////////////////
+  // Writable
+  /////////////////////////////////////////////////
+  /**
+   */
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(this.version);
+    new UTF8( this.name ).write(out);
+    new UTF8( this.storageID ).write(out);
+    new UTF8( this.registrationID ).write(out);   
+  }
+
+  /**
+   */
+  public void readFields(DataInput in) throws IOException {
+    this.version = in.readInt();
+    UTF8 uStr = new UTF8();
+    uStr.readFields(in);
+    this.name = uStr.toString();
+    uStr.readFields(in);
+    this.storageID = uStr.toString();
+    uStr.readFields(in);
+    this.registrationID = uStr.toString();   
+  }
+}

+ 3 - 3
src/java/org/apache/hadoop/dfs/DistributedFileSystem.java

@@ -133,7 +133,7 @@ public class DistributedFileSystem extends FileSystem {
         }
 
         DFSFileInfo info[] = dfs.listPaths(getPath(f));
-        return info[0].getLen();
+        return (info == null) ? 0 : info[0].getLen();
     }
 
     public short getReplication(Path f) throws IOException {
@@ -247,8 +247,8 @@ public class DistributedFileSystem extends FileSystem {
 
       for (int i = 0; i < dnReport.length; i++) {
         reports[i] = new DataNodeReport();
-        reports[i].name = dnReport[i].getName().toString();
-        reports[i].host = dnReport[i].getHost().toString();
+        reports[i].name = dnReport[i].getName();
+        reports[i].host = dnReport[i].getHost();
         reports[i].capacity = dnReport[i].getCapacity();
         reports[i].remaining = dnReport[i].getRemaining();
         reports[i].lastUpdate = dnReport[i].lastUpdate();

+ 6 - 2
src/java/org/apache/hadoop/dfs/FSConstants.java

@@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration;
  *
  * @author Mike Cafarella
  ************************************/
-interface FSConstants {
+public interface FSConstants {
     public static int MIN_BLOCKS_FOR_WRITE = 5;
 
     public static final long WRITE_COMPLETE = 0xcafae11a;
@@ -109,5 +109,9 @@ interface FSConstants {
     //TODO mb@media-style.com: should be conf injected?
     public static final int BUFFER_SIZE = new Configuration().getInt("io.file.buffer.size", 4096);
 
+    // Version is reflected in the dfs image and edit log files.
+    // Version is reflected in the data storage file.
+    // Versions are negative.
+    // Decrement DFS_CURRENT_VERSION to define a new version.
+    public static final int DFS_CURRENT_VERSION = -2;
 }
-

+ 37 - 13
src/java/org/apache/hadoop/dfs/FSDirectory.java

@@ -35,10 +35,6 @@ import org.apache.hadoop.fs.Path;
  * @author Mike Cafarella
  *************************************************/
 class FSDirectory implements FSConstants {
-    // Version is reflected in the dfs image and edit log files.
-    // Versions are negative. 
-    // Decrement DFS_CURRENT_VERSION to define a new version.
-    private static final int DFS_CURRENT_VERSION = -1;
     private static final String FS_IMAGE = "fsimage";
     private static final String NEW_FS_IMAGE = "fsimage.new";
     private static final String OLD_FS_IMAGE = "fsimage.old";
@@ -295,6 +291,7 @@ class FSDirectory implements FSConstants {
     TreeMap activeLocks = new TreeMap();
     DataOutputStream editlog = null;
     boolean ready = false;
+    int namespaceID = 0;  /// a persistent attribute of the namespace
 
     /** Access an existing dfs name directory. */
     public FSDirectory(File dir, Configuration conf) throws IOException {
@@ -329,6 +326,27 @@ class FSDirectory implements FSConstants {
           throw new IOException("Unable to format: "+dir);
         }
     }
+    
+    /**
+     * Generate new namespaceID.
+     * 
+     * namespaceID is a persistent attribute of the namespace.
+     * It is generated when the namenode is formatted and remains the same
+     * during the life cycle of the namenode.
+     * When a datanodes register they receive it as the registrationID,
+     * which is checked every time the datanode is communicating with the 
+     * namenode. Datanodes that do not 'know' the namespaceID are rejected.
+     * 
+     * @return new namespaceID
+     */
+    private int newNamespaceID() {
+      Random r = new Random();
+      r.setSeed( System.currentTimeMillis() );
+      int newID = 0;
+      while( newID == 0)
+        newID = r.nextInt();
+      return newID;
+    }
 
     /**
      * Shutdown the filestore
@@ -387,11 +405,16 @@ class FSDirectory implements FSConstants {
         //
         // Load in bits
         //
+        boolean needToSave = true;
+        int imgVersion = DFS_CURRENT_VERSION;
         if (curFile.exists()) {
             DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(curFile)));
             try {
-                // read image version
-                int imgVersion = in.readInt();
+                // read image version: first appeared in version -1
+                imgVersion = in.readInt();
+                // read namespaceID: first appeared in version -2
+                if( imgVersion <= -2 )
+                  namespaceID = in.readInt();
                 // read number of files
                 int numFiles = 0;
                 // version 0 does not store version #
@@ -402,6 +425,7 @@ class FSDirectory implements FSConstants {
                 } else 
                   numFiles = in.readInt();
                   
+                needToSave = ( imgVersion != DFS_CURRENT_VERSION );
                 if( imgVersion < DFS_CURRENT_VERSION ) // future version
                   throw new IOException(
                               "Unsupported version of the file system image: "
@@ -436,11 +460,10 @@ class FSDirectory implements FSConstants {
             }
         }
 
-        if (edits.exists() && loadFSEdits(edits, conf) > 0) {
-            return true;
-        } else {
-            return false;
-        }
+        if( namespaceID == 0 )
+          namespaceID = newNamespaceID();
+        
+        return needToSave || ( edits.exists() && loadFSEdits(edits, conf) > 0 );
     }
 
     /**
@@ -584,6 +607,7 @@ class FSDirectory implements FSConstants {
         DataOutputStream out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(newFile)));
         try {
             out.writeInt(DFS_CURRENT_VERSION);
+            out.writeInt(this.namespaceID);
             out.writeInt(rootDir.numItemsInTree() - 1);
             rootDir.saveImage("", out);
         } finally {
@@ -967,9 +991,9 @@ class FSDirectory implements FSConstants {
                 lastSuccess = false;
             }
         }
-        if( !lastSuccess )
+/*        if( !lastSuccess )
             NameNode.stateChangeLog.warning("DIR* FSDirectory.mkdirs: "
-                    +"failed to create directory "+src );
+                    +"failed to create directory "+src );*/
         return lastSuccess;
     }
 

+ 361 - 195
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -105,7 +105,7 @@ class FSNamesystem implements FSConstants {
             } else if (lu1 > lu2) {
                 return 1;
             } else {
-                return d1.getName().compareTo(d2.getName());
+                return d1.getStorageID().compareTo(d2.getStorageID());
             }
         }
     });
@@ -325,7 +325,7 @@ class FSNamesystem implements FSConstants {
             +src+" for "+holder+" at "+clientMachine);
       try {
         if (pendingCreates.get(src) != null) {
-           throw new NameNode.AlreadyBeingCreatedException(
+           throw new AlreadyBeingCreatedException(
                    "failed to create file " + src + " for " + holder +
                    " on client " + clientMachine + 
                    " because pendingCreates is non-null.");
@@ -410,10 +410,10 @@ class FSNamesystem implements FSConstants {
           (FileUnderConstruction) pendingCreates.get(src);
         // make sure that we still have the lease on this file
         if (pendingFile == null) {
-          throw new NameNode.LeaseExpiredException("No lease on " + src);
+          throw new LeaseExpiredException("No lease on " + src);
         }
         if (!pendingFile.getClientName().equals(clientName)) {
-          throw new NameNode.LeaseExpiredException("Lease mismatch on " + src + 
+          throw new LeaseExpiredException("Lease mismatch on " + src + 
               " owned by " + pendingFile.getClientName() + 
               " and appended by " + clientName);
         }
@@ -421,6 +421,12 @@ class FSNamesystem implements FSConstants {
           throw new IOException("File " + src + " created during write");
         }
 
+        //
+        // If we fail this, bad things happen!
+        //
+        if (!checkFileProgress(src)) {
+          throw new NotReplicatedYetException("Not replicated yet");
+        }
         
         // Get the array of replication targets 
         DatanodeInfo targets[] = chooseTargets(pendingFile.getReplication(), 
@@ -660,10 +666,10 @@ class FSNamesystem implements FSConstants {
                 if (containingNodes != null) {
                     for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {
                         DatanodeInfo node = (DatanodeInfo) it.next();
-                        Vector invalidateSet = (Vector) recentInvalidateSets.get(node.getName());
+                        Vector invalidateSet = (Vector) recentInvalidateSets.get(node.getStorageID());
                         if (invalidateSet == null) {
                             invalidateSet = new Vector();
-                            recentInvalidateSets.put(node.getName(), invalidateSet);
+                            recentInvalidateSets.put(node.getStorageID(), invalidateSet);
                         }
                         invalidateSet.add(b);
                         NameNode.stateChangeLog.finer("BLOCK* NameSystem.delete: "
@@ -755,7 +761,7 @@ class FSNamesystem implements FSConstants {
                 if (containingNodes != null) {
                   for (Iterator it =containingNodes.iterator(); it.hasNext();) {
                     DatanodeInfo cur = (DatanodeInfo) it.next();
-                    v.add(cur.getHost());
+                    v.add(new UTF8( cur.getHost() ));
                   }
                 }
                 hosts[i-startBlock] = (UTF8[]) v.toArray(new UTF8[v.size()]);
@@ -972,36 +978,150 @@ class FSNamesystem implements FSConstants {
     // These methods are called by datanodes
     //
     /////////////////////////////////////////////////////////
+    /**
+     * Register Datanode.
+     * <p>
+     * The purpose of registration is to identify whether the new datanode
+     * serves a new data storage, and will report new data block copies,
+     * which the namenode was not aware of; or the datanode is a replacement
+     * node for the data storage that was previously served by a different
+     * or the same (in terms of host:port) datanode.
+     * The data storages are distinguished by their storageIDs. When a new
+     * data storage is reported the namenode issues a new unique storageID.
+     * <p>
+     * Finally, the namenode returns its namespaceID as the registrationID
+     * for the datanodes. 
+     * namespaceID is a persistent attribute of the name space.
+     * The registrationID is checked every time the datanode is communicating
+     * with the namenode. 
+     * Datanodes with inappropriate registrationID are rejected.
+     * If the namenode stops, and then restarts it can restore its 
+     * namespaceID and will continue serving the datanodes that has previously
+     * registered with the namenode without restarting the whole cluster.
+     * 
+     * @see DataNode#register()
+     * @author Konstantin Shvachko
+     */
+    public synchronized void registerDatanode( DatanodeRegistration nodeReg 
+                                              ) throws IOException {
+      NameNode.stateChangeLog.fine(
+          "BLOCK* NameSystem.registerDatanode: "
+          + "node registration from " + nodeReg.getName()
+          + " storage " + nodeReg.getStorageID() );
+
+      nodeReg.registrationID = getRegistrationID();
+      DatanodeInfo nodeS = (DatanodeInfo)datanodeMap.get(nodeReg.getStorageID());
+      DatanodeInfo nodeN = getDatanodeByName( nodeReg.getName() );
+      
+      if( nodeN != null && nodeS != null && nodeN == nodeS ) {
+        // The same datanode has been just restarted to serve the same data 
+        // storage. We do not need to remove old data blocks, the delta will  
+        // be calculated on the next block report from the datanode
+        NameNode.stateChangeLog.fine(
+            "BLOCK* NameSystem.registerDatanode: "
+            + "node restarted." );
+        return;
+      }
+      
+      if( nodeN != null ) {
+        // nodeN previously served a different data storage, 
+        // which is not served by anybody anymore.
+        removeDatanode( nodeN );
+        nodeN = null;
+      }
+      
+      // nodeN is not found
+      if( nodeS == null ) {
+        // this is a new datanode serving a new data storage
+        if( nodeReg.getStorageID().equals("") ) {
+          // this data storage has never registered
+          // it is either empty or was created by previous version of DFS
+          nodeReg.storageID = newStorageID();
+          NameNode.stateChangeLog.fine(
+              "BLOCK* NameSystem.registerDatanode: "
+              + "new storageID " + nodeReg.getStorageID() + " assigned." );
+        }
+        // register new datanode
+        datanodeMap.put(nodeReg.getStorageID(), 
+                        new DatanodeInfo( nodeReg ) );
+        NameNode.stateChangeLog.fine(
+            "BLOCK* NameSystem.registerDatanode: "
+            + "node registered." );
+        return;
+      }
+
+      // nodeS is found
+      // The registering datanode is a replacement node for the existing 
+      // data storage, which from now on will be served by a new node.
+      NameNode.stateChangeLog.fine(
+          "BLOCK* NameSystem.registerDatanode: "
+          + "node " + nodeS.name
+          + " is replaced by " + nodeReg.getName() + "." );
+      nodeS.name = nodeReg.getName();
+      return;
+    }
+    
+    /**
+     * Get registrationID for datanodes based on the namespaceID.
+     * 
+     * @see #registerDatanode(DatanodeRegistration)
+     * @see FSDirectory#newNamespaceID()
+     * @return registration ID
+     */
+    public String getRegistrationID() {
+      return "NS" + Integer.toString( dir.namespaceID );
+    }
+    
+    /**
+     * Generate new storage ID.
+     * 
+     * @return unique storage ID
+     * 
+     * Note: that collisions are still possible if somebody will try 
+     * to bring in a data storage from a different cluster.
+     */
+    private String newStorageID() {
+      String newID = null;
+      while( newID == null ) {
+        newID = "DS" + Integer.toString( r.nextInt() );
+        if( datanodeMap.get( newID ) != null )
+          newID = null;
+      }
+      return newID;
+    }
+    
     /**
      * The given node has reported in.  This method should:
      * 1) Record the heartbeat, so the datanode isn't timed out
      * 2) Adjust usage stats for future block allocation
      */
-    public synchronized void gotHeartbeat(UTF8 name, long capacity, long remaining) {
-        synchronized (heartbeats) {
-            synchronized (datanodeMap) {
-                long capacityDiff = 0;
-                long remainingDiff = 0;
-                DatanodeInfo nodeinfo = (DatanodeInfo) datanodeMap.get(name);
-
-                if (nodeinfo == null) {
-                    NameNode.stateChangeLog.fine("BLOCK* NameSystem.gotHeartbeat: "
-                            +"brand-new heartbeat from "+name );
-                     nodeinfo = new DatanodeInfo(name, capacity, remaining);
-                    datanodeMap.put(name, nodeinfo);
-                    capacityDiff = capacity;
-                    remainingDiff = remaining;
-                } else {
-                    capacityDiff = capacity - nodeinfo.getCapacity();
-                    remainingDiff = remaining - nodeinfo.getRemaining();
-                    heartbeats.remove(nodeinfo);
-                    nodeinfo.updateHeartbeat(capacity, remaining);
-                }
-                heartbeats.add(nodeinfo);
-                totalCapacity += capacityDiff;
-                totalRemaining += remainingDiff;
-            }
+    public synchronized void gotHeartbeat(DatanodeID nodeID,
+                                          long capacity, 
+                                          long remaining) throws IOException {
+      synchronized (heartbeats) {
+        synchronized (datanodeMap) {
+          long capacityDiff = 0;
+          long remainingDiff = 0;
+          DatanodeInfo nodeinfo = getDatanode( nodeID );
+
+          if (nodeinfo == null) {
+            NameNode.stateChangeLog.fine("BLOCK* NameSystem.gotHeartbeat: "
+                    +"brand-new heartbeat from "+nodeID.getName() );
+            nodeinfo = new DatanodeInfo(nodeID, capacity, remaining);
+            datanodeMap.put(nodeinfo.getStorageID(), nodeinfo);
+            capacityDiff = capacity;
+            remainingDiff = remaining;
+          } else {
+            capacityDiff = capacity - nodeinfo.getCapacity();
+            remainingDiff = remaining - nodeinfo.getRemaining();
+            heartbeats.remove(nodeinfo);
+            nodeinfo.updateHeartbeat(capacity, remaining);
+          }
+          heartbeats.add(nodeinfo);
+          totalCapacity += capacityDiff;
+          totalRemaining += remainingDiff;
         }
+      }
     }
 
     /**
@@ -1020,77 +1140,73 @@ class FSNamesystem implements FSConstants {
             }
         }
     }
-    
+
     /**
      * remove a datanode info
      * @param name: datanode name
      * @author hairong
      */
-
-    synchronized void rmDataNodeByName( UTF8 name ) {
-        DatanodeInfo nodeInfo = (DatanodeInfo) datanodeMap.get(name);
-        if (nodeInfo != null) {
-            rmDataNode( nodeInfo );
-        } else {
-            NameNode.stateChangeLog.warning("BLOCK* NameSystem.rmDataNodeByName: "
-                    + nodeInfo.getName() + " does not exist");
-        }
+    synchronized public void removeDatanode( DatanodeID nodeID ) 
+    throws IOException {
+      DatanodeInfo nodeInfo = getDatanode( nodeID );
+      if (nodeInfo != null) {
+        removeDatanode( nodeInfo );
+      } else {
+          NameNode.stateChangeLog.warning("BLOCK* NameSystem.removeDatanode: "
+                  + nodeInfo.getName() + " does not exist");
+      }
+  }
+  
+  /**
+   * remove a datanode info
+   * @param nodeInfo: datanode info
+   * @author hairong
+   */
+    private void removeDatanode( DatanodeInfo nodeInfo ) {
+      heartbeats.remove(nodeInfo);
+      datanodeMap.remove(nodeInfo.getStorageID());
+      NameNode.stateChangeLog.finer("BLOCK* NameSystem.removeDatanode: "
+              + nodeInfo.getName() + " is removed from datanodeMap");
+      totalCapacity -= nodeInfo.getCapacity();
+      totalRemaining -= nodeInfo.getRemaining();
+
+      Block deadblocks[] = nodeInfo.getBlocks();
+      if( deadblocks != null )
+        for( int i = 0; i < deadblocks.length; i++ )
+          removeStoredBlock(deadblocks[i], nodeInfo);
     }
-    
-    /**
-     * remove a datanode info
-     * @param nodeInfo: datanode info
-     * @author hairong
-     */
-    private synchronized void rmDataNode( DatanodeInfo nodeInfo ) {
-        heartbeats.remove( nodeInfo );
-        synchronized (datanodeMap) {
-            datanodeMap.remove(nodeInfo.getName());
-            NameNode.stateChangeLog.finer("BLOCK* NameSystem.heartbeatCheck: "
-                    + nodeInfo.getName() + " is removed from datanodeMap");
-        }
-        totalCapacity -= nodeInfo.getCapacity();
-        totalRemaining -= nodeInfo.getRemaining();
 
-        Block deadblocks[] = nodeInfo.getBlocks();
-        if (deadblocks != null) {
-            for (int i = 0; i < deadblocks.length; i++) {
-                removeStoredBlock(deadblocks[i], nodeInfo);
-            }
-        }
-    }
-        
     /**
      * Check if there are any expired heartbeats, and if so,
      * whether any blocks have to be re-replicated.
      */
     synchronized void heartbeatCheck() {
-        synchronized (heartbeats) {
-            DatanodeInfo nodeInfo = null;
-
-            while ((heartbeats.size() > 0) &&
-                   ((nodeInfo = (DatanodeInfo) heartbeats.first()) != null) &&
-                   (nodeInfo.lastUpdate() < System.currentTimeMillis() - EXPIRE_INTERVAL)) {
-                NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: "
-                           + "lost heartbeat from " + nodeInfo.getName());
-                rmDataNode(nodeInfo);
-            }
+      synchronized (heartbeats) {
+        DatanodeInfo nodeInfo = null;
+
+        while ((heartbeats.size() > 0) &&
+               ((nodeInfo = (DatanodeInfo) heartbeats.first()) != null) &&
+               (nodeInfo.lastUpdate() < System.currentTimeMillis() - EXPIRE_INTERVAL)) {
+          NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: "
+              + "lost heartbeat from " + nodeInfo.getName());
+          removeDatanode( nodeInfo );
+          if (heartbeats.size() > 0) {
+              nodeInfo = (DatanodeInfo) heartbeats.first();
+          }
         }
+      }
     }
     
     /**
      * The given node is reporting all its blocks.  Use this info to 
      * update the (machine-->blocklist) and (block-->machinelist) tables.
      */
-    public synchronized Block[] processReport(Block newReport[], UTF8 name) {
+    public synchronized Block[] processReport(DatanodeID nodeID, 
+                                              Block newReport[]
+                                            ) throws IOException {
         NameNode.stateChangeLog.fine("BLOCK* NameSystem.processReport: "
-                +"from "+name+" "+newReport.length+" blocks" );
-        DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name);
-        if (node == null) {
-            NameNode.stateChangeLog.severe("BLOCK* NameSystem.processReport: "
-                    +"from "+name+" but can not find its info" );
-            throw new IllegalArgumentException("Unexpected exception.  Received block report from node " + name + ", but there is no info for " + name);
-        }
+          +"from "+nodeID.getName()+" "+newReport.length+" blocks" );
+        DatanodeInfo node = getDatanode( nodeID );
 
         //
         // Modify the (block-->datanode) map, according to the difference
@@ -1150,7 +1266,7 @@ class FSNamesystem implements FSConstants {
             if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {
                 obsolete.add(b);
                 NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
-                        +"ask "+name+" to delete "+b.getBlockName() );
+                        +"ask "+nodeID.getName()+" to delete "+b.getBlockName() );
             }
         }
         return (Block[]) obsolete.toArray(new Block[obsolete.size()]);
@@ -1215,7 +1331,7 @@ class FSNamesystem implements FSConstants {
       Vector nonExcess = new Vector();
       for (Iterator it = containingNodes.iterator(); it.hasNext(); ) {
           DatanodeInfo cur = (DatanodeInfo) it.next();
-          TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());
+          TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getStorageID());
           if (excessBlocks == null || ! excessBlocks.contains(block)) {
               nonExcess.add(cur);
           }
@@ -1238,10 +1354,10 @@ class FSNamesystem implements FSConstants {
             DatanodeInfo cur = (DatanodeInfo) nonExcess.elementAt(chosenNode);
             nonExcess.removeElementAt(chosenNode);
 
-            TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getName());
+            TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(cur.getStorageID());
             if (excessBlocks == null) {
                 excessBlocks = new TreeSet();
-                excessReplicateMap.put(cur.getName(), excessBlocks);
+                excessReplicateMap.put(cur.getStorageID(), excessBlocks);
             }
             excessBlocks.add(b);
             NameNode.stateChangeLog.finer("BLOCK* NameSystem.chooseExcessReplicates: "
@@ -1256,10 +1372,10 @@ class FSNamesystem implements FSConstants {
             // should be deleted.  Items are removed from the invalidate list
             // upon giving instructions to the namenode.
             //
-            Vector invalidateSet = (Vector) recentInvalidateSets.get(cur.getName());
+            Vector invalidateSet = (Vector) recentInvalidateSets.get(cur.getStorageID());
             if (invalidateSet == null) {
                 invalidateSet = new Vector();
-                recentInvalidateSets.put(cur.getName(), invalidateSet);
+                recentInvalidateSets.put(cur.getStorageID(), invalidateSet);
             }
             invalidateSet.add(b);
             NameNode.stateChangeLog.finer("BLOCK* NameSystem.chooseExcessReplicates: "
@@ -1299,13 +1415,13 @@ class FSNamesystem implements FSConstants {
         // We've removed a block from a node, so it's definitely no longer
         // in "excess" there.
         //
-        TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(node.getName());
+        TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(node.getStorageID());
         if (excessBlocks != null) {
             excessBlocks.remove(block);
             NameNode.stateChangeLog.finer("BLOCK* NameSystem.removeStoredBlock: "
                     +block.getBlockName()+" is removed from excessBlocks" );
             if (excessBlocks.size() == 0) {
-                excessReplicateMap.remove(node.getName());
+                excessReplicateMap.remove(node.getStorageID());
             }
         }
     }
@@ -1313,15 +1429,20 @@ class FSNamesystem implements FSConstants {
     /**
      * The given node is reporting that it received a certain block.
      */
-    public synchronized void blockReceived(Block block, UTF8 name) {
-        DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name);
+    public synchronized void blockReceived( DatanodeID nodeID,  
+                                            Block block
+                                          ) throws IOException {
+        DatanodeInfo node = getDatanode( nodeID );
         if (node == null) {
             NameNode.stateChangeLog.warning("BLOCK* NameSystem.blockReceived: "
-                    +block.getBlockName()+" is received from an unrecorded node " + name );
-            throw new IllegalArgumentException("Unexpected exception.  Got blockReceived message from node " + name + ", but there is no info for " + name);
+             + block.getBlockName() + " is received from an unrecorded node " 
+             + nodeID.getName() );
+            throw new IllegalArgumentException(
+                "Unexpected exception.  Got blockReceived message from node " 
+                + block.getBlockName() + ", but there is no info for it");
         }
         NameNode.stateChangeLog.fine("BLOCK* NameSystem.blockReceived: "
-                +block.getBlockName()+" is received from " + name );
+                +block.getBlockName()+" is received from " + nodeID.getName() );
         //
         // Modify the blocks->datanode map
         // 
@@ -1374,8 +1495,9 @@ class FSNamesystem implements FSConstants {
     /**
      * Check if there are any recently-deleted blocks a datanode should remove.
      */
-    public synchronized Block[] blocksToInvalidate(UTF8 sender) {
-        Vector invalidateSet = (Vector) recentInvalidateSets.remove(sender);
+    public synchronized Block[] blocksToInvalidate( DatanodeID nodeID ) {
+        Vector invalidateSet = (Vector) recentInvalidateSets.remove( 
+                                                      nodeID.getStorageID() );
  
         if (invalidateSet == null ) 
             return null;
@@ -1387,7 +1509,7 @@ class FSNamesystem implements FSConstants {
                 blockList.append(((Block)invalidateSet.elementAt(i)).getBlockName());
             }
             NameNode.stateChangeLog.info("BLOCK* NameSystem.blockToInvalidate: "
-                   +"ask "+sender+" to delete " + blockList );
+                   +"ask "+nodeID.getName()+" to delete " + blockList );
         }
         return (Block[]) invalidateSet.toArray(new Block[invalidateSet.size()]);
     }
@@ -1402,114 +1524,117 @@ class FSNamesystem implements FSConstants {
      *     target sequence for the Block at the appropriate index.
      *
      */
-    public synchronized Object[] pendingTransfers(DatanodeInfo srcNode, int xmitsInProgress) {
-        synchronized (neededReplications) {
-            Object results[] = null;
-            int scheduledXfers = 0;
-
-            if (neededReplications.size() > 0) {
-                //
-                // Go through all blocks that need replications.  See if any
-                // are present at the current node.  If so, ask the node to
-                // replicate them.
-                //
-                Vector replicateBlocks = new Vector();
-                Vector replicateTargetSets = new Vector();
-                for (Iterator it = neededReplications.iterator(); it.hasNext(); ) {
-                    //
-                    // We can only reply with 'maxXfers' or fewer blocks
-                    //
-                    if (scheduledXfers >= this.maxReplicationStreams - xmitsInProgress) {
-                        break;
-                    }
-
-                    Block block = (Block) it.next();
-                    long blockSize = block.getNumBytes();
-                    FSDirectory.INode fileINode = dir.getFileByBlock(block);
-                    if( fileINode == null ) { // block does not belong to any file
-                        it.remove();
-                    } else {
-                        TreeSet containingNodes = (TreeSet) blocksMap.get(block);
-                        TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(srcNode.getName());
-                        // srcNode must contain the block, and the block must 
-                        // not be scheduled for removal on that node
-                        if (containingNodes.contains(srcNode) 
-                              && ( excessBlocks == null 
-                               || ! excessBlocks.contains(block))) {
-                            DatanodeInfo targets[] = 
-                              chooseTargets(Math.min(fileINode.getReplication() 
-                                                       - containingNodes.size(), 
-                                                     maxReplicationStreams
-                                                       - xmitsInProgress), 
-                                                     containingNodes, null, 
-                                                     blockSize);
-                            if (targets.length > 0) {
-                                // Build items to return
-                                replicateBlocks.add(block);
-                                replicateTargetSets.add(targets);
-                                scheduledXfers += targets.length;
-                            }
-                        }
-                    }
-                }
+    public synchronized Object[] pendingTransfers(DatanodeInfo srcNode,
+                                                  int xmitsInProgress) {
+    synchronized (neededReplications) {
+      Object results[] = null;
+      int scheduledXfers = 0;
 
-                //
-                // Move the block-replication into a "pending" state.
-                // The reason we use 'pending' is so we can retry
-                // replications that fail after an appropriate amount of time.  
-                // (REMIND - mjc - this timer is not yet implemented.)
-                //
-                if (replicateBlocks.size() > 0) {
-                    int i = 0;
-                    for (Iterator it = replicateBlocks.iterator(); it.hasNext(); i++) {
-                        Block block = (Block) it.next();
-                        DatanodeInfo targets[] = (DatanodeInfo[]) replicateTargetSets.elementAt(i);
-                        TreeSet containingNodes = (TreeSet) blocksMap.get(block);
-
-                        if (containingNodes.size() + targets.length >= dir.getFileByBlock(block).getReplication()) {
-                            neededReplications.remove(block);
-                            pendingReplications.add(block);
-                            NameNode.stateChangeLog.finer("BLOCK* NameSystem.pendingTransfer: "
-                                    +block.getBlockName()
-                                    +" is removed from neededReplications to pendingReplications" );
-                        }
+      if (neededReplications.size() > 0) {
+        //
+        // Go through all blocks that need replications. See if any
+        // are present at the current node. If so, ask the node to
+        // replicate them.
+        //
+        Vector replicateBlocks = new Vector();
+        Vector replicateTargetSets = new Vector();
+        for (Iterator it = neededReplications.iterator(); it.hasNext();) {
+          //
+          // We can only reply with 'maxXfers' or fewer blocks
+          //
+          if (scheduledXfers >= this.maxReplicationStreams - xmitsInProgress) {
+            break;
+          }
 
-                        if(NameNode.stateChangeLog.isLoggable(Level.INFO)) {
-                            StringBuffer targetList = new StringBuffer( "datanode(s)");
-                            for(int k=0; k<targets.length; k++) {
-                               targetList.append(' ');
-                               targetList.append(targets[k].getName());
-                            }
-                            NameNode.stateChangeLog.info("BLOCK* NameSystem.pendingTransfer: "
-                                    +"ask "+srcNode.getName()
-                                    +" to replicate "+block.getBlockName()
-                                    +" to "+targetList);
-                        }
-                    }
+          Block block = (Block) it.next();
+          long blockSize = block.getNumBytes();
+          FSDirectory.INode fileINode = dir.getFileByBlock(block);
+          if (fileINode == null) { // block does not belong to any file
+            it.remove();
+          } else {
+            TreeSet containingNodes = (TreeSet) blocksMap.get(block);
+            TreeSet excessBlocks = (TreeSet) excessReplicateMap.get( 
+                                                      srcNode.getStorageID() );
+            // srcNode must contain the block, and the block must
+            // not be scheduled for removal on that node
+            if (containingNodes.contains(srcNode)
+                && (excessBlocks == null || ! excessBlocks.contains(block))) {
+              DatanodeInfo targets[] = chooseTargets(
+                  Math.min( fileINode.getReplication() - containingNodes.size(),
+                            this.maxReplicationStreams - xmitsInProgress), 
+                  containingNodes, null, blockSize);
+              if (targets.length > 0) {
+                // Build items to return
+                replicateBlocks.add(block);
+                replicateTargetSets.add(targets);
+                scheduledXfers += targets.length;
+              }
+            }
+          }
+        }
 
-                    //
-                    // Build returned objects from above lists
-                    //
-                    DatanodeInfo targetMatrix[][] = new DatanodeInfo[replicateTargetSets.size()][];
-                    for (i = 0; i < targetMatrix.length; i++) {
-                        targetMatrix[i] = (DatanodeInfo[]) replicateTargetSets.elementAt(i);
-                    }
+        //
+        // Move the block-replication into a "pending" state.
+        // The reason we use 'pending' is so we can retry
+        // replications that fail after an appropriate amount of time.
+        // (REMIND - mjc - this timer is not yet implemented.)
+        //
+        if (replicateBlocks.size() > 0) {
+          int i = 0;
+          for (Iterator it = replicateBlocks.iterator(); it.hasNext(); i++) {
+            Block block = (Block) it.next();
+            DatanodeInfo targets[] = 
+                      (DatanodeInfo[]) replicateTargetSets.elementAt(i);
+            TreeSet containingNodes = (TreeSet) blocksMap.get(block);
+
+            if (containingNodes.size() + targets.length >= 
+                    dir.getFileByBlock( block).getReplication() ) {
+              neededReplications.remove(block);
+              pendingReplications.add(block);
+              NameNode.stateChangeLog.finer(
+                "BLOCK* NameSystem.pendingTransfer: "
+                + block.getBlockName()
+                + " is removed from neededReplications to pendingReplications");
+            }
 
-                    results = new Object[2];
-                    results[0] = replicateBlocks.toArray(new Block[replicateBlocks.size()]);
-                    results[1]  = targetMatrix;
-                }
+            if (NameNode.stateChangeLog.isLoggable(Level.INFO)) {
+              StringBuffer targetList = new StringBuffer("datanode(s)");
+              for (int k = 0; k < targets.length; k++) {
+                targetList.append(' ');
+                targetList.append(targets[k].getName());
+              }
+              NameNode.stateChangeLog.info(
+                      "BLOCK* NameSystem.pendingTransfer: " + "ask "
+                      + srcNode.getName() + " to replicate "
+                      + block.getBlockName() + " to " + targetList);
             }
-            return results;
+          }
+
+          //
+          // Build returned objects from above lists
+          //
+          DatanodeInfo targetMatrix[][] = 
+                        new DatanodeInfo[replicateTargetSets.size()][];
+          for (i = 0; i < targetMatrix.length; i++) {
+            targetMatrix[i] = (DatanodeInfo[]) replicateTargetSets.elementAt(i);
+          }
+
+          results = new Object[2];
+          results[0] = replicateBlocks.toArray(new Block[replicateBlocks.size()]);
+          results[1] = targetMatrix;
         }
+      }
+      return results;
     }
+  }
 
     /**
      * Get a certain number of targets, if possible.
      * If not, return as many as we can.
-     * @param desiredReplicates number of duplicates wanted.
-     * @param forbiddenNodes of DatanodeInfo instances that should not be
-     * considered targets.
+     * @param desiredReplicates
+     *          number of duplicates wanted.
+     * @param forbiddenNodes
+     *          of DatanodeInfo instances that should not be considered targets.
      * @return array of DatanodeInfo instances uses as targets.
      */
     DatanodeInfo[] chooseTargets(int desiredReplicates, TreeSet forbiddenNodes,
@@ -1685,4 +1810,45 @@ class FSNamesystem implements FSConstants {
         return clientMachine;
       }
     }
+
+    /**
+     * Get data node by storage ID.
+     * 
+     * @param nodeID
+     * @return DatanodeInfo or null if the node is not found.
+     * @throws IOException
+     */
+    public DatanodeInfo getDatanode( DatanodeID nodeID ) throws IOException {
+      UnregisteredDatanodeException e = null;
+      DatanodeInfo node = (DatanodeInfo) datanodeMap.get(nodeID.getStorageID());
+      if (node == null) 
+        return null;
+      if (!node.getName().equals(nodeID.getName())) {
+        e = new UnregisteredDatanodeException( nodeID, node );
+        NameNode.stateChangeLog.severe("BLOCK* NameSystem.getDatanode: "
+            + e.getLocalizedMessage() );
+        throw e;
+      }
+      return node;
+    }
+    
+    /**
+     * Find data node by its name.
+     * 
+     * This method is called when the node is registering.
+     * Not performance critical.
+     * Otherwise an additional tree-like structure will be required.
+     * 
+     * @param name
+     * @return DatanodeInfo if found or null otherwise 
+     * @throws IOException
+     */
+    public DatanodeInfo getDatanodeByName( String name ) throws IOException {
+      for (Iterator it = datanodeMap.values().iterator(); it.hasNext(); ) {
+        DatanodeInfo node = (DatanodeInfo) it.next();
+        if( node.getName().equals(name) )
+           return node;
+      }
+      return null;
+    }
 }

+ 19 - 0
src/java/org/apache/hadoop/dfs/IncorrectVersionException.java

@@ -0,0 +1,19 @@
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+
+/**
+ * The exception is thrown when external version does not match 
+ * current version of the appication.
+ * 
+ * @author Konstantin Shvachko
+ */
+class IncorrectVersionException extends IOException {
+
+  public IncorrectVersionException( int version, String ofWhat ) {
+    super( "Unexpected version " 
+        + (ofWhat==null ? "" : "of " + ofWhat) + " reported: "
+        + version + ". Expecting = " + FSConstants.DFS_CURRENT_VERSION + "." );
+  }
+
+}

+ 13 - 0
src/java/org/apache/hadoop/dfs/LeaseExpiredException.java

@@ -0,0 +1,13 @@
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+
+/**
+ * The lease that was being used to create this file has expired.
+ * @author Owen O'Malley
+ */
+public class LeaseExpiredException extends IOException {
+  public LeaseExpiredException(String msg) {
+    super(msg);
+  }
+}

+ 63 - 50
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -144,37 +144,6 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
         }
     }
 
-    /**
-     * The exception that happens when you ask to create a file that already
-     * is being created, but is not closed yet.
-     * @author Owen O'Malley
-     */
-    public static class AlreadyBeingCreatedException extends IOException {
-      public AlreadyBeingCreatedException(String msg) {
-        super(msg);
-      }
-    }
-    
-    /**
-     * The lease that was being used to create this file has expired.
-     * @author Owen O'Malley
-     */
-    public static class LeaseExpiredException extends IOException {
-      public LeaseExpiredException(String msg) {
-        super(msg);
-      }
-    }
-    
-    /**
-     * The file has not finished being written to enough datanodes yet.
-     * @author Owen O'Malley
-     */
-    public static class NotReplicatedYetException extends IOException {
-      public NotReplicatedYetException(String msg) {
-        super(msg);
-      }
-    }
-    
     /**
      */
     public LocatedBlock create(String src, 
@@ -230,7 +199,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
                 +targets.length + " locations" );
 
         for (int i = 0; i < targets.length; i++) {
-            namesystem.blockReceived(b, targets[i].getName());
+            namesystem.blockReceived( targets[i], b );
         }
     }
 
@@ -379,14 +348,26 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
     ////////////////////////////////////////////////////////////////
     // DatanodeProtocol
     ////////////////////////////////////////////////////////////////
+    /** 
+     */
+    public DatanodeRegistration register( DatanodeRegistration nodeReg
+                                        ) throws IOException {
+      verifyVersion( nodeReg.getVersion() );
+      namesystem.registerDatanode( nodeReg );
+      return nodeReg;
+    }
+    
     /**
      * Data node notify the name node that it is alive 
      * Return a block-oriented command for the datanode to execute.
      * This will be either a transfer or a delete operation.
      */
-    public BlockCommand sendHeartbeat(String sender, long capacity, long remaining,
-            int xmitsInProgress) {
-        namesystem.gotHeartbeat(new UTF8(sender), capacity, remaining);        
+    public BlockCommand sendHeartbeat(DatanodeRegistration nodeReg,
+                                      long capacity, 
+                                      long remaining,
+                                      int xmitsInProgress) throws IOException {
+        verifyRequest( nodeReg );
+        namesystem.gotHeartbeat( nodeReg, capacity, remaining );
         
         //
         // Only ask datanodes to perform block operations (transfer, delete) 
@@ -408,7 +389,8 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
         //
         // Ask to perform pending transfers, if any
         //
-        Object xferResults[] = namesystem.pendingTransfers(new DatanodeInfo(new UTF8(sender)), xmitsInProgress);
+        Object xferResults[] = namesystem.pendingTransfers(
+                       new DatanodeInfo( nodeReg ), xmitsInProgress );
         if (xferResults != null) {
             return new BlockCommand((Block[]) xferResults[0], (DatanodeInfo[][]) xferResults[1]);
         }
@@ -419,39 +401,70 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
         // a block report.  This is just a small fast removal of blocks that have
         // just been removed.
         //
-        Block blocks[] = namesystem.blocksToInvalidate(new UTF8(sender));
+        Block blocks[] = namesystem.blocksToInvalidate( nodeReg );
         if (blocks != null) {
             return new BlockCommand(blocks);
         }
         return null;
     }
 
-    public Block[] blockReport(String sender, Block blocks[]) {
+    public Block[] blockReport( DatanodeRegistration nodeReg,
+                                Block blocks[]) throws IOException {
+        verifyRequest( nodeReg );
         stateChangeLog.fine("*BLOCK* NameNode.blockReport: "
-                +"from "+sender+" "+blocks.length+" blocks" );
+                +"from "+nodeReg.getName()+" "+blocks.length+" blocks" );
         if( firstBlockReportTime==0)
               firstBlockReportTime=System.currentTimeMillis();
 
-        return namesystem.processReport(blocks, new UTF8(sender));
+        return namesystem.processReport( nodeReg, blocks );
      }
 
-    public void blockReceived(String sender, Block blocks[]) {
+    public void blockReceived(DatanodeRegistration nodeReg, 
+                              Block blocks[]) throws IOException {
+        verifyRequest( nodeReg );
         stateChangeLog.fine("*BLOCK* NameNode.blockReceived: "
-                +"from "+sender+" "+blocks.length+" blocks." );
+                +"from "+nodeReg.getName()+" "+blocks.length+" blocks." );
         for (int i = 0; i < blocks.length; i++) {
-            namesystem.blockReceived(blocks[i], new UTF8(sender));
+            namesystem.blockReceived( nodeReg, blocks[i] );
         }
     }
 
     /**
      */
-    public void errorReport(String sender, int errorCode, String msg) {
-        // Log error message from datanode
-        LOG.warning("Report from " + sender + ": " + msg);
-        if( errorCode == DatanodeProtocol.DISK_ERROR ) {
-            namesystem.rmDataNodeByName(new UTF8(sender));            
-        }
-            
+    public void errorReport(DatanodeRegistration nodeReg,
+                            int errorCode, 
+                            String msg) throws IOException {
+      // Log error message from datanode
+      verifyRequest( nodeReg );
+      LOG.warning("Report from " + nodeReg.getName() + ": " + msg);
+      if( errorCode == DatanodeProtocol.DISK_ERROR ) {
+          namesystem.removeDatanode( nodeReg );            
+      }
+    }
+
+    /** 
+     * Verify request.
+     * 
+     * Verifies correctness of the datanode version and registration ID.
+     * 
+     * @param nodeReg data node registration
+     * @throws IOException
+     */
+    public void verifyRequest( DatanodeRegistration nodeReg ) throws IOException {
+      verifyVersion( nodeReg.getVersion() );
+      if( ! namesystem.getRegistrationID().equals( nodeReg.getRegistrationID() ))
+          throw new UnregisteredDatanodeException( nodeReg );
+    }
+    
+    /**
+     * Verify version.
+     * 
+     * @param version
+     * @throws IOException
+     */
+    public void verifyVersion( int version ) throws IOException {
+      if( version != DFS_CURRENT_VERSION )
+        throw new IncorrectVersionException( version, "data node" );
     }
 
     /**

+ 13 - 0
src/java/org/apache/hadoop/dfs/NotReplicatedYetException.java

@@ -0,0 +1,13 @@
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+
+/**
+ * The file has not finished being written to enough datanodes yet.
+ * @author Owen O'Malley
+ */
+public class NotReplicatedYetException extends IOException {
+  public NotReplicatedYetException(String msg) {
+    super(msg);
+  }
+}

+ 25 - 0
src/java/org/apache/hadoop/dfs/UnregisteredDatanodeException.java

@@ -0,0 +1,25 @@
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+
+
+/**
+ * This exception is thrown when a datanode that has not previously 
+ * registered is trying to access the name node.
+ * 
+ * @author Konstantin Shvachko
+ */
+class UnregisteredDatanodeException extends IOException {
+
+  public UnregisteredDatanodeException( DatanodeID nodeID ) {
+    super("Unregistered data node: " + nodeID.getName() );
+  }
+
+  public UnregisteredDatanodeException( DatanodeID nodeID, 
+                                        DatanodeInfo storedNode ) {
+    super("Data node " + nodeID.getName() 
+        + "is attempting to report storage ID "
+        + nodeID.getStorageID() + ". Expecting " 
+        + storedNode.getStorageID() + ".");
+  }
+}

+ 5 - 5
src/test/org/apache/hadoop/fs/TestDFSIO.java

@@ -51,7 +51,7 @@ import org.apache.hadoop.conf.*;
  * <li>total number of bytes processed</li>
  * <li>throughput in mb/sec (total number of bytes / sum of processing times)</li>
  * <li>average i/o rate in mb/sec per file</li>
- * <li>standard i/o rate deviation</li>
+ * <li>standard deviation of i/o rate </li>
  * </ul>
  *
  * @author Konstantin Shvachko
@@ -309,11 +309,11 @@ public class TestDFSIO extends TestCase {
       System.exit(-1);
     }
     for (int i = 0; i < args.length; i++) {       // parse command line
-      if (args[i].startsWith("-r")) {
+      if (args[i].startsWith("-read")) {
         testType = TEST_TYPE_READ;
-      } else if (args[i].startsWith("-w")) {
+      } else if (args[i].equals("-write")) {
         testType = TEST_TYPE_WRITE;
-      } else if (args[i].startsWith("-clean")) {
+      } else if (args[i].equals("-clean")) {
         testType = TEST_TYPE_CLEANUP;
       } else if (args[i].startsWith("-seq")) {
         isSequential = true;
@@ -410,7 +410,7 @@ public class TestDFSIO extends TestCase {
       "Total MBytes processed: " + size/MEGA,
       "     Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
       "Average IO rate mb/sec: " + med,
-      " Std IO rate deviation: " + stdDev,
+      " IO rate std deviation: " + stdDev,
       "    Test exec time sec: " + (float)execTime / 1000,
       "" };
 

+ 1 - 0
src/test/org/apache/hadoop/test/AllTestDriver.java

@@ -50,6 +50,7 @@ public class AllTestDriver {
 	    pgd.addClass("testsequencefileinputformat", TestSequenceFileInputFormat.class, "A test for sequence file input format.");
 	    pgd.addClass("testtextinputformat", TestTextInputFormat.class, "A test for text input format.");
       pgd.addClass("TestDFSIO", TestDFSIO.class, "Distributed i/o benchmark.");
+      pgd.addClass("DistributedFSCheck", TestDFSIO.class, "Distributed checkup of the file system consistency.");
 	    pgd.driver(argv);
 	}
 	catch(Throwable e){