Browse Source

HADOOP-396. Make DatanodeID implement Writable. Contributed by Konstantin.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@427673 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 years ago
parent
commit
fbd1feb0d6

+ 3 - 0
CHANGES.txt

@@ -111,6 +111,9 @@ Trunk (unreleased changes)
 31. HADOOP-394.  Change order of DFS shutdown in unit tests to
     minimize errors logged.  (Konstantin Shvachko via cutting)
 
+32. HADOOP-396.  Make DatanodeID implement Writable.
+    (Konstantin Shvachko via cutting)
+
 
 Release 0.4.0 - 2006-06-28
 

+ 2 - 1
src/java/org/apache/hadoop/dfs/ClientProtocol.java

@@ -27,7 +27,8 @@ import org.apache.hadoop.ipc.VersionedProtocol;
  **********************************************************************/
 interface ClientProtocol extends VersionedProtocol {
 
-  public static final long versionID = 1L;
+  public static final long versionID = 2L;  // infoPort added to DatanodeID
+                                            // affected: DatanodeInfo, LocatedBlock
   
     ///////////////////////////////////////
     // File contents

+ 4 - 5
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -96,7 +96,6 @@ public class DataNode implements FSConstants, Runnable {
     long blockReportInterval;
     private DataStorage storage = null;
     private StatusHttpServer infoServer;
-    private int infoPort;
     private static InetSocketAddress nameNodeAddr;
     private static DataNode datanodeObject = null;
     private class DataNodeMetrics {
@@ -155,15 +154,15 @@ public class DataNode implements FSConstants, Runnable {
         this(InetAddress.getLocalHost().getHostName(), 
              new File(datadir),
              createSocketAddr(conf.get("fs.default.name", "local")), conf);
-        infoPort = conf.getInt("dfs.datanode.info.port", 50075);
-        this.infoServer = new StatusHttpServer("datanode", infoPort, true);
+        int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
+        this.infoServer = new StatusHttpServer("datanode", infoServerPort, true);
         //create a servlet to serve full-file content
         try {
           this.infoServer.addServlet(null, "/streamFile/*",
                 "org.apache.hadoop.dfs.StreamFile", null);
         } catch (Exception e) {LOG.warn("addServlet threw exception", e);}
         this.infoServer.start();
-        infoPort = this.infoServer.getPort();
+        this.dnRegistration.infoPort = this.infoServer.getPort();
         // register datanode
         register();
         datanodeObject = this;
@@ -204,6 +203,7 @@ public class DataNode implements FSConstants, Runnable {
                                         DFS_CURRENT_VERSION, 
                                         machineName + ":" + tmpPort, 
                                         storage.getStorageID(),
+                                        -1,
                                         "" );
       // initialize data node internal structure
       this.data = new FSDataset(datadir, conf);
@@ -247,7 +247,6 @@ public class DataNode implements FSConstants, Runnable {
      * @throws IOException
      */
     private void register() throws IOException {
-      dnRegistration.infoPort = infoPort;
       dnRegistration = namenode.register( dnRegistration );
       if( storage.getStorageID().equals("") ) {
         storage.setStorageID( dnRegistration.getStorageID());

+ 2 - 13
src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java

@@ -30,7 +30,7 @@ class DatanodeDescriptor extends DatanodeInfo {
   private volatile TreeSet blocks = null;
 
   DatanodeDescriptor( DatanodeID nodeID ) {
-    this( nodeID.getName(), nodeID.getStorageID(), 0L, 0L, 0);
+    this( nodeID, 0L, 0L, 0 );
   }
   
   /**
@@ -40,18 +40,7 @@ class DatanodeDescriptor extends DatanodeInfo {
                       long capacity, 
                       long remaining,
                       int xceiverCount ) {
-    this( nodeID.getName(), nodeID.getStorageID(), capacity, remaining, xceiverCount );
-  }
-
-  /**
-   * @param name hostname:portNumber as String object.
-   */
-  DatanodeDescriptor( String name, 
-                      String storageID, 
-                      long capacity, 
-                      long remaining,
-                      int xceiverCount ) {
-    super( name, storageID );
+    super( nodeID );
     this.blocks = new TreeSet();
     updateHeartbeat(capacity, remaining, xceiverCount);
   }

+ 65 - 2
src/java/org/apache/hadoop/dfs/DatanodeID.java

@@ -1,5 +1,12 @@
 package org.apache.hadoop.dfs;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.WritableComparable;
+
 /**
  * DatanodeID is composed of the data node 
  * name (hostname:portNumber) and the data storage ID, 
@@ -7,10 +14,27 @@ package org.apache.hadoop.dfs;
  * 
  * @author Konstantin Shvachko
  */
-public class DatanodeID implements Comparable {
+public class DatanodeID implements WritableComparable {
 
   protected String name;      /// hostname:portNumber
   protected String storageID; /// unique per cluster storageID
+  protected int infoPort;     /// the port where the infoserver is running
+
+  /**
+   * DatanodeID default constructor
+   */
+  public DatanodeID() {
+    this( new String(), new String(), -1 );
+  }
+
+  /**
+   * DatanodeID copy constructor
+   * 
+   * @param from
+   */
+  public DatanodeID( DatanodeID from ) {
+    this( from.getName(), from.getStorageID(), from.getInfoPort() );
+  }
   
   /**
    * Create DatanodeID
@@ -18,9 +42,10 @@ public class DatanodeID implements Comparable {
    * @param nodeName (hostname:portNumber) 
    * @param storageID data storage ID
    */
-  public DatanodeID( String nodeName, String storageID ) {
+  public DatanodeID( String nodeName, String storageID, int infoPort ) {
     this.name = nodeName;
     this.storageID = storageID;
+    this.infoPort = infoPort;
   }
   
   /**
@@ -37,6 +62,13 @@ public class DatanodeID implements Comparable {
     return this.storageID;
   }
 
+  /**
+   * @return infoPort (the port at which the HTTP server bound to)
+   */
+  public int getInfoPort() {
+    return infoPort;
+  }
+  
   /**
    * @return hostname and no :portNumber.
    */
@@ -49,6 +81,14 @@ public class DatanodeID implements Comparable {
     }
   }
   
+  public boolean equals( Object to ) {
+    return (this.compareTo( to ) != 0);
+  }
+  
+  public int hashCode() {
+    return name.hashCode()^ storageID.hashCode();
+  }
+  
   public String toString() {
     return name;
   }
@@ -61,4 +101,27 @@ public class DatanodeID implements Comparable {
   public int compareTo(Object o) {
     return name.compareTo(((DatanodeID)o).getName());
   }
+
+  /////////////////////////////////////////////////
+  // Writable
+  /////////////////////////////////////////////////
+  /**
+   */
+  public void write(DataOutput out) throws IOException {
+    UTF8.writeString(out, name);
+    UTF8.writeString(out, storageID);
+    out.writeShort(infoPort);
+  }
+
+  /**
+   */
+  public void readFields(DataInput in) throws IOException {
+    name = UTF8.readString(in);
+    storageID = UTF8.readString(in);
+    // the infoPort read could be negative, if the port is a large number (more
+    // than 15 bits in storage size (but less than 16 bits).
+    // So chop off the first two bytes (and hence the signed bits) before 
+    // setting the field.
+    this.infoPort = in.readShort() & 0x0000ffff;
+  }
 }

+ 13 - 24
src/java/org/apache/hadoop/dfs/DatanodeInfo.java

@@ -31,33 +31,32 @@ import org.apache.hadoop.io.WritableFactory;
  * @author Mike Cafarella
  * @author Konstantin Shvachko
  */
-public class DatanodeInfo extends DatanodeID implements Writable {
+public class DatanodeInfo extends DatanodeID {
   protected long capacity;
   protected long remaining;
   protected long lastUpdate;
   protected int xceiverCount;
-  protected int infoPort; //the port where the infoserver is running
 
   DatanodeInfo() {
-    this( new String(), new String() );
-  }
-  
-  DatanodeInfo( String name, String storageID) {
-    super( name, storageID );
-    this.capacity = 0L;
-    this.remaining = 0L;
-    this.lastUpdate = 0L;
-    this.xceiverCount = 0;
+    super();
   }
   
   DatanodeInfo( DatanodeInfo from ) {
-    super( from.getName(), from.getStorageID() );
+    super( from );
     this.capacity = from.getCapacity();
     this.remaining = from.getRemaining();
     this.lastUpdate = from.getLastUpdate();
     this.xceiverCount = from.getXceiverCount();
   }
 
+  DatanodeInfo( DatanodeID nodeID ) {
+    super( nodeID );
+    this.capacity = 0L;
+    this.remaining = 0L;
+    this.lastUpdate = 0L;
+    this.xceiverCount = 0;
+  }
+  
   /** The raw capacity. */
   public long getCapacity() { return capacity; }
 
@@ -73,9 +72,6 @@ public class DatanodeInfo extends DatanodeID implements Writable {
   /** @deprecated Use {@link #getLastUpdate()} instead. */
   public long lastUpdate() { return getLastUpdate(); }
 
-  /** The port at which the http server is running*/
-  public int infoPort() { return infoPort; }
-
   /** A formatted string for reporting the status of the DataNode. */
   public String getDatanodeReport() {
     StringBuffer buffer = new StringBuffer();
@@ -104,27 +100,20 @@ public class DatanodeInfo extends DatanodeID implements Writable {
   /**
    */
   public void write(DataOutput out) throws IOException {
-    new UTF8( this.name ).write(out);
-    new UTF8( this.storageID ).write(out);
+    super.write( out );
     out.writeLong(capacity);
     out.writeLong(remaining);
     out.writeLong(lastUpdate);
     out.writeInt(xceiverCount);
-    out.writeInt(infoPort);
   }
 
   /**
    */
   public void readFields(DataInput in) throws IOException {
-    UTF8 uStr = new UTF8();
-    uStr.readFields(in);
-    this.name = uStr.toString();
-    uStr.readFields(in);
-    this.storageID = uStr.toString();
+    super.readFields(in);
     this.capacity = in.readLong();
     this.remaining = in.readLong();
     this.lastUpdate = in.readLong();
     this.xceiverCount = in.readInt();
-    this.infoPort = in.readInt();
   }
 }

+ 3 - 1
src/java/org/apache/hadoop/dfs/DatanodeProtocol.java

@@ -29,7 +29,9 @@ import org.apache.hadoop.ipc.VersionedProtocol;
  * @author Michael Cafarella
  **********************************************************************/
 interface DatanodeProtocol extends VersionedProtocol {
-  public static final long versionID = 1L;
+  public static final long versionID = 2L;  // infoPort added to DatanodeID
+                                            // affected: DatanodeRegistration
+  
   // error code
   final static int DISK_ERROR = 1;
   final static int INVALID_BLOCK = 2;

+ 7 - 15
src/java/org/apache/hadoop/dfs/DatanodeRegistration.java

@@ -28,13 +28,12 @@ class DatanodeRegistration extends DatanodeID implements Writable {
   int version;            /// current Datanode version
   String registrationID;  /// a unique per namenode id; indicates   
                           /// the namenode the datanode is registered with
-  int infoPort;
 
   /**
    * Default constructor.
    */
   public DatanodeRegistration() {
-    this( 0, null, null, null );
+    this( 0, null, null, -1, null );
   }
   
   /**
@@ -43,8 +42,9 @@ class DatanodeRegistration extends DatanodeID implements Writable {
   public DatanodeRegistration(int version, 
                               String nodeName, 
                               String storageID,
+                              int infoPort,
                               String registrationID ) {
-    super(nodeName, storageID);
+    super( nodeName, storageID, infoPort );
     this.version = version;
     this.registrationID = registrationID;
   }
@@ -68,23 +68,15 @@ class DatanodeRegistration extends DatanodeID implements Writable {
    */
   public void write(DataOutput out) throws IOException {
     out.writeInt(this.version);
-    new UTF8( this.name ).write(out);
-    new UTF8( this.storageID ).write(out);
-    new UTF8( this.registrationID ).write(out);   
-    out.writeInt(this.infoPort);
+    super.write( out );
+    UTF8.writeString(out, registrationID);
   }
 
   /**
    */
   public void readFields(DataInput in) throws IOException {
     this.version = in.readInt();
-    UTF8 uStr = new UTF8();
-    uStr.readFields(in);
-    this.name = uStr.toString();
-    uStr.readFields(in);
-    this.storageID = uStr.toString();
-    uStr.readFields(in);
-    this.registrationID = uStr.toString();   
-    this.infoPort = in.readInt();
+    super.readFields(in);
+    this.registrationID = UTF8.readString(in);   
   }
 }

+ 7 - 10
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -1076,17 +1076,15 @@ class FSNamesystem implements FSConstants {
         // this is a new datanode serving a new data storage
         if( nodeReg.getStorageID().equals("") ) {
           // this data storage has never registered
-          // it is either empty or was created by previous version of DFS
+          // it is either empty or was created by pre-storageID version of DFS
           nodeReg.storageID = newStorageID();
           NameNode.stateChangeLog.debug(
               "BLOCK* NameSystem.registerDatanode: "
               + "new storageID " + nodeReg.getStorageID() + " assigned." );
         }
         // register new datanode
-        DatanodeDescriptor dinfo;
         datanodeMap.put(nodeReg.getStorageID(), 
-                        (dinfo = new DatanodeDescriptor( nodeReg ) ) ) ;
-        dinfo.infoPort = nodeReg.infoPort;
+                        new DatanodeDescriptor( nodeReg ));
         NameNode.stateChangeLog.debug(
             "BLOCK* NameSystem.registerDatanode: "
             + "node registered." );
@@ -1153,7 +1151,6 @@ class FSNamesystem implements FSConstants {
             NameNode.stateChangeLog.debug("BLOCK* NameSystem.gotHeartbeat: "
                     +"brand-new heartbeat from "+nodeID.getName() );
             nodeinfo = new DatanodeDescriptor(nodeID, capacity, remaining, xceiverCount);
-            nodeinfo.infoPort = ((DatanodeRegistration)nodeID).infoPort;
             datanodeMap.put(nodeinfo.getStorageID(), nodeinfo);
             capacityDiff = capacity;
             remainingDiff = remaining;
@@ -1188,7 +1185,7 @@ class FSNamesystem implements FSConstants {
     }
 
     /**
-     * remove a datanode info
+     * remove a datanode descriptor
      * @param nodeID datanode ID
      * @author hairong
      */
@@ -1204,8 +1201,8 @@ class FSNamesystem implements FSConstants {
   }
   
   /**
-   * remove a datanode info
-   * @param nodeInfo datanode info
+   * remove a datanode descriptor
+   * @param nodeInfo datanode descriptor
    * @author hairong
    */
     private void removeDatanode( DatanodeDescriptor nodeInfo ) {
@@ -1611,7 +1608,7 @@ class FSNamesystem implements FSConstants {
      *     target sequence for the Block at the appropriate index.
      *
      */
-    public synchronized Object[] pendingTransfers(DatanodeDescriptor srcNode,
+    public synchronized Object[] pendingTransfers(DatanodeID srcNode,
                                                   int xmitsInProgress) {
     synchronized (neededReplications) {
       Object results[] = null;
@@ -1963,7 +1960,7 @@ class FSNamesystem implements FSConstants {
         index = r.nextInt(size);
         DatanodeInfo d = getDatanodeByIndex(index);
         if (d != null) {
-          return d.getHost() + ":" + d.infoPort();
+          return d.getHost() + ":" + d.getInfoPort();
         }
       }
       return null;

+ 1 - 1
src/java/org/apache/hadoop/dfs/JspHelper.java

@@ -63,7 +63,7 @@ public class JspHelper {
         chosenNode = nodes[index];
 
         //just ping to check whether the node is alive
-        InetSocketAddress targetAddr = DataNode.createSocketAddr(chosenNode.getHost() + ":" + chosenNode.infoPort());
+        InetSocketAddress targetAddr = DataNode.createSocketAddr(chosenNode.getHost() + ":" + chosenNode.getInfoPort());
         
         try {
           s = new Socket();

+ 2 - 2
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -443,8 +443,8 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
         //
         // Ask to perform pending transfers, if any
         //
-        Object xferResults[] = namesystem.pendingTransfers(
-                       new DatanodeDescriptor( nodeReg ), xmitsInProgress );
+        Object xferResults[] = namesystem.pendingTransfers( nodeReg,
+                                                            xmitsInProgress );
         if (xferResults != null) {
             return new BlockCommand((Block[]) xferResults[0], (DatanodeInfo[][]) xferResults[1]);
         }

+ 2 - 2
src/webapps/datanode/browseBlock.jsp

@@ -107,7 +107,7 @@
                                            datanodeAddr.indexOf(':') + 1, 
                                       datanodeAddr.length())); 
             nextHost = InetAddress.getByName(d.getHost()).getCanonicalHostName();
-            nextPort = d.infoPort(); 
+            nextPort = d.getInfoPort(); 
           }
         }
       }
@@ -154,7 +154,7 @@
                                           datanodeAddr.indexOf(':') + 1, 
                                       datanodeAddr.length())); 
             prevHost = InetAddress.getByName(d.getHost()).getCanonicalHostName();
-            prevPort = d.infoPort();
+            prevPort = d.getInfoPort();
           }
         }
       }

+ 3 - 3
src/webapps/datanode/browseData.jsp

@@ -56,7 +56,7 @@
     String fqdn = 
            InetAddress.getByName(chosenNode.getHost()).getCanonicalHostName();
     String tailUrl = "http://" + fqdn + ":" +
-                     chosenNode.infoPort() + 
+                     chosenNode.getInfoPort() + 
                      "/tail.jsp?filename=" + filename;
     out.print("<a href=\"" + tailUrl + "\">TAIL this file</a><br>");
 
@@ -77,7 +77,7 @@
                                       datanodeAddr.length())); 
     fqdn = InetAddress.getByName(chosenNode.getHost()).getCanonicalHostName();
     String chunkViewUrl = "http://" + fqdn + ":" +
-                     chosenNode.infoPort() + 
+                     chosenNode.getInfoPort() + 
                      "/browseBlock.jsp?blockId=" + Long.toString(blockId) +
                      "&tail=false&blockSize=" + blockSize +
                      "&filename=" + filename +
@@ -105,7 +105,7 @@
                                       datanodeAddr.length())); 
         fqdn = InetAddress.getByName(locs[j].getHost()).getCanonicalHostName();
         String blockUrl = "http://"+ fqdn + ":" +
-                          locs[j].infoPort() +
+                          locs[j].getInfoPort() +
                           "/browseBlock.jsp?blockId=" + Long.toString(blockId) +
                           "&blockSize=" + blockSize +
                           "&filename=" + filename + 

+ 1 - 1
src/webapps/datanode/browseDirectory.jsp

@@ -62,7 +62,7 @@
         DatanodeInfo chosenNode = jspHelper.bestNode(blocks[0]);
         String fqdn = InetAddress.getByName(chosenNode.getHost()).getCanonicalHostName();
         String datanodeUrl = "http://"+fqdn+":" +
-                             chosenNode.infoPort() + 
+                             chosenNode.getInfoPort() + 
                              "/browseData.jsp?filename=" +
                              files[i].getPath() + "&blockSize=" + 
                              files[i].getBlockSize();

+ 1 - 1
src/webapps/dfs/browseDirectory.jsp

@@ -62,7 +62,7 @@
         DatanodeInfo chosenNode = jspHelper.bestNode(blocks[0]);
         String fqdn = InetAddress.getByName(chosenNode.getHost()).getCanonicalHostName();
         String datanodeUrl = "http://"+fqdn+":" +
-                             chosenNode.infoPort() + 
+                             chosenNode.getInfoPort() + 
                              "/browseData.jsp?filename=" +
                              files[i].getPath() + "&blockSize=" + 
                              files[i].getBlockSize();