1
0
Просмотр исходного кода

HADOOP-985. Change HDFS to identify nodes by IP address rather than DNS hostname. Contributed by Raghu.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@510636 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 лет назад
Родитель
Сommit
c51dd4a38c

+ 3 - 0
CHANGES.txt

@@ -113,6 +113,9 @@ Trunk (unreleased changes)
     failed more than a specified number in the job.
     (Arun C Murthy via cutting)
 
+34. HADOOP-985.  Change HDFS to identify nodes by IP address rather
+    than by DNS hostname.  (Raghu Angadi via cutting)
+
 
 Release 0.11.2 - 2007-02-16
 

+ 7 - 5
src/java/org/apache/hadoop/dfs/ClientProtocol.java

@@ -29,7 +29,11 @@ import org.apache.hadoop.ipc.VersionedProtocol;
  **********************************************************************/
 interface ClientProtocol extends VersionedProtocol {
 
-    public static final long versionID = 8L; // refreshNodes added
+    /* 7 : periodic checkpoint added.
+     * 8 : refreshNodes added
+     * 9 : clientMachine is removed from open() and create().
+     */
+    public static final long versionID = 9L;  
   
     ///////////////////////////////////////
     // File contents
@@ -37,14 +41,13 @@ interface ClientProtocol extends VersionedProtocol {
     /**
      * Open an existing file, at the given name.  Returns block 
      * and DataNode info.  DataNodes for each block are sorted by
-     * the distance to the clientMachine, which contains the host name.
+     * the distance to the client's address.
      * The client will then have to contact
      * each indicated DataNode to obtain the actual data.  There
      * is no need to call close() or any other function after
      * calling open().
      */
-    public LocatedBlock[] open( String clientMachine,
-                                String src) throws IOException;
+    public LocatedBlock[] open(String src) throws IOException;
 
     /**
      * Create a new file.  Get back block and datanode info,
@@ -61,7 +64,6 @@ interface ClientProtocol extends VersionedProtocol {
      */
     public LocatedBlock create( String src, 
                                 String clientName, 
-                                String clientMachine, 
                                 boolean overwrite, 
                                 short replication,
                                 long blockSize

+ 2 - 8
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -47,7 +47,6 @@ class DFSClient implements FSConstants {
     private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
     private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
     ClientProtocol namenode;
-    String localName;
     boolean running = true;
     Random r = new Random();
     String clientName;
@@ -105,11 +104,6 @@ class DFSClient implements FSConstants {
         this.conf = conf;
         this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class,
             ClientProtocol.versionID, nameNodeAddr, conf);
-        try {
-            this.localName = InetAddress.getLocalHost().getHostName();
-        } catch (UnknownHostException uhe) {
-            this.localName = "";
-        }
         String taskId = conf.get("mapred.task.id");
         if (taskId != null) {
             this.clientName = "DFSClient_" + taskId; 
@@ -514,7 +508,7 @@ class DFSClient implements FSConstants {
         synchronized void openInfo() throws IOException {
             Block oldBlocks[] = this.blocks;
 
-            LocatedBlock results[] = namenode.open(localName, src);            
+            LocatedBlock results[] = namenode.open(src);            
             Vector blockV = new Vector();
             Vector nodeV = new Vector();
             for (int i = 0; i < results.length; i++) {
@@ -1089,7 +1083,7 @@ class DFSClient implements FSConstants {
             while (true) {
               try {
                 return namenode.create(src.toString(), clientName.toString(),
-                    localName, overwrite, replication, blockSize);
+                                       overwrite, replication, blockSize);
               } catch (RemoteException e) {
                 if (--retries == 0 || 
                     !AlreadyBeingCreatedException.class.getName().

+ 5 - 5
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -121,6 +121,7 @@ public class DataNode implements FSConstants, Runnable {
     private DataNodeMetrics myMetrics = new DataNodeMetrics();
     private static InetSocketAddress nameNodeAddr;
     private static DataNode datanodeObject = null;
+    String machineName;
 
     private class DataNodeMetrics implements Updater {
       private final MetricsRecord metricsRecord;
@@ -194,9 +195,7 @@ public class DataNode implements FSConstants, Runnable {
     }
     
     DataNode(Configuration conf, String networkLoc, String[] dataDirs) throws IOException {
-        this(InetAddress.getLocalHost().getHostName(),
-             networkLoc,
-             dataDirs,
+        this(networkLoc, dataDirs,
              createSocketAddr(conf.get("fs.default.name", "local")), conf);
         // register datanode
         int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
@@ -225,8 +224,7 @@ public class DataNode implements FSConstants, Runnable {
      * 
      * @see DataStorage
      */
-    private DataNode(String machineName,
-                    String networkLoc,
+    private DataNode(String networkLoc,
                     String[] dataDirs, 
                     InetSocketAddress nameNodeAddr, 
                     Configuration conf ) throws IOException {
@@ -322,6 +320,8 @@ public class DataNode implements FSConstants, Runnable {
     private void register() throws IOException {
       while( true ) {
         try {
+          // reset name to machineName. Mainly for web interface.
+          dnRegistration.name = machineName + ":" + dnRegistration.getPort();
           dnRegistration = namenode.register( dnRegistration, networkLoc );
           break;
         } catch( SocketTimeoutException e ) {  // namenode is busy

+ 17 - 3
src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java

@@ -69,8 +69,21 @@ public class DatanodeDescriptor extends DatanodeInfo {
    * @param nodeID id of the data node
    * @param networkLocation location of the data node in network
    */
-  public DatanodeDescriptor( DatanodeID nodeID, String networkLocation ) {
-    this( nodeID, networkLocation, 0L, 0L, 0 );
+  public DatanodeDescriptor( DatanodeID nodeID, 
+                             String networkLocation ) {
+    this( nodeID, networkLocation, null );
+  }
+  
+  /** DatanodeDescriptor constructor
+   * 
+   * @param nodeID id of the data node
+   * @param networkLocation location of the data node in network
+   * @param hostName it could be different from host specified for DatanodeID
+   */
+  public DatanodeDescriptor( DatanodeID nodeID, 
+                             String networkLocation,
+                             String hostName ) {
+    this( nodeID, networkLocation, hostName, 0L, 0L, 0 );
   }
   
   /** DatanodeDescriptor constructor
@@ -99,10 +112,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
    */
   public DatanodeDescriptor( DatanodeID nodeID,
                               String networkLocation,
+                              String hostName,
                               long capacity, 
                               long remaining,
                               int xceiverCount ) {
-    super( nodeID, networkLocation );
+    super( nodeID, networkLocation, hostName );
     updateHeartbeat( capacity, remaining, xceiverCount);
     initWorkLists();
   }

+ 8 - 0
src/java/org/apache/hadoop/dfs/DatanodeID.java

@@ -88,6 +88,14 @@ public class DatanodeID implements WritableComparable {
     }
   }
   
+  public int getPort() {
+    int colon = name.indexOf(":");
+    if ( colon < 0 ) {
+      return 50010; // default port.
+    }
+    return Integer.parseInt(name.substring(colon+1));
+  }
+
   public boolean equals( Object to ) {
     return (name.equals(((DatanodeID)to).getName()) &&
         storageID.equals(((DatanodeID)to).getStorageID()));

+ 17 - 1
src/java/org/apache/hadoop/dfs/DatanodeInfo.java

@@ -48,6 +48,11 @@ public class DatanodeInfo extends DatanodeID implements Node {
   protected int xceiverCount;
   private String location = NetworkTopology.DEFAULT_RACK;
 
+  /** HostName as suplied by the datanode during registration as its 
+   * name. Namenode uses datanode IP address as the name.
+   */
+  private String hostName = null;
+  
   // administrative states of a datanode
   public enum AdminStates {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; }
   protected AdminStates adminState;
@@ -66,6 +71,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
     this.xceiverCount = from.getXceiverCount();
     this.location = from.getNetworkLocation();
     this.adminState = from.adminState;
+    this.hostName = from.hostName;
   }
 
   DatanodeInfo( DatanodeID nodeID ) {
@@ -77,9 +83,10 @@ public class DatanodeInfo extends DatanodeID implements Node {
       this.adminState = null;    
   }
   
-  DatanodeInfo( DatanodeID nodeID, String location ) {
+  DatanodeInfo( DatanodeID nodeID, String location, String hostName ) {
       this(nodeID);
       this.location = location;
+      this.hostName = hostName;
   }
   
   /** The raw capacity. */
@@ -126,6 +133,15 @@ public class DatanodeInfo extends DatanodeID implements Node {
       return location+NodeBase.PATH_SEPARATOR_STR+name;
   }
 
+  
+  public String getHostName() {
+    return ( hostName == null || hostName.length()==0 ) ? getHost() : hostName;
+  }
+  
+  public void setHostName( String host ) {
+    hostName = host;
+  }
+  
   /** A formatted string for reporting the status of the DataNode. */
   public String getDatanodeReport() {
     StringBuffer buffer = new StringBuffer();

+ 20 - 2
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.mapred.StatusHttpServer;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.Server;
 
 import java.io.*;
 import java.util.*;
@@ -1211,7 +1212,7 @@ class FSNamesystem implements FSConstants {
                 Collection<String> v = new ArrayList<String>();
                 if (containingNodes != null) {
                   for (Iterator<DatanodeDescriptor> it =containingNodes.iterator(); it.hasNext();) {
-                    v.add( it.next().getHost() );
+                    v.add( it.next().getHostName() );
                   }
                 }
                 hosts[i-startBlock] = v.toArray(new String[v.size()]);
@@ -1469,6 +1470,22 @@ class FSNamesystem implements FSConstants {
     public synchronized void registerDatanode( DatanodeRegistration nodeReg,
                                                String networkLocation
                                               ) throws IOException {
+
+      String dnAddress = Server.getRemoteAddress();
+      if ( dnAddress == null ) {
+        //Mostly not called inside an RPC.
+        throw new IOException( "Could not find remote address for " +
+                               "registration from " + nodeReg.getName() );
+      }      
+
+      String hostName = nodeReg.getHost();
+      
+      // update the datanode's name with ip:port
+      DatanodeID dnReg = new DatanodeID( dnAddress + ":" + nodeReg.getPort(),
+                                         nodeReg.getStorageID(),
+                                         nodeReg.getInfoPort() );
+      nodeReg.updateRegInfo( dnReg );
+      
       NameNode.stateChangeLog.info(
           "BLOCK* NameSystem.registerDatanode: "
           + "node registration from " + nodeReg.getName()
@@ -1513,6 +1530,7 @@ class FSNamesystem implements FSConstants {
         nodeS.updateRegInfo( nodeReg );
         nodeS.setNetworkLocation( networkLocation );
         clusterMap.add( nodeS );
+        nodeS.setHostName( hostName );
         getEditLog().logAddDatanode( nodeS );
         
         // also treat the registration message as a heartbeat
@@ -1536,7 +1554,7 @@ class FSNamesystem implements FSConstants {
       }
       // register new datanode
       DatanodeDescriptor nodeDescr 
-                  = new DatanodeDescriptor( nodeReg, networkLocation );
+              = new DatanodeDescriptor( nodeReg, networkLocation, hostName );
       unprotectedAddDatanode( nodeDescr );
       getEditLog().logAddDatanode( nodeDescr );
       

+ 1 - 1
src/java/org/apache/hadoop/dfs/JspHelper.java

@@ -236,7 +236,7 @@ public class JspHelper {
                     ret = (ddbl < 0) ? -1 : ( (ddbl > 0) ? 1 : 0 );
                     break;
                 case FIELD_NAME: 
-                    ret = d1.getName().compareTo(d2.getName());
+                    ret = d1.getHostName().compareTo(d2.getHostName());
                     break;
                 }
                 return ( sortOrder == SORT_ORDER_DSC ) ? -ret : ret;

+ 9 - 2
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -236,7 +236,11 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
     
     /**
      */
-    public LocatedBlock[] open(String clientMachine, String src) throws IOException {
+    public LocatedBlock[] open(String src) throws IOException {
+        String clientMachine = Server.getRemoteAddress();
+        if ( clientMachine == null ) {
+            clientMachine = "";
+        }
         Object openResults[] = namesystem.open(clientMachine, new UTF8(src));
         if (openResults == null) {
             throw new IOException("Cannot open filename " + src);
@@ -256,11 +260,14 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
      */
     public LocatedBlock create(String src, 
                                String clientName, 
-                               String clientMachine, 
                                boolean overwrite,
                                short replication,
                                long blockSize
     ) throws IOException {
+       String clientMachine = Server.getRemoteAddress();
+       if ( clientMachine == null ) {
+           clientMachine = "";
+       }
        stateChangeLog.debug("*DIR* NameNode.create: file "
             +src+" for "+clientName+" at "+clientMachine);
        if (!checkPathLength(src)) {

+ 1 - 6
src/java/org/apache/hadoop/dfs/NamenodeFsck.java

@@ -153,8 +153,7 @@ public class NamenodeFsck {
     }
     res.totalFiles++;
     res.totalSize += file.getLen();
-    LocatedBlock[] blocks = nn.open(DNS.getDefaultHost("default"),
-                                    file.getPath());
+    LocatedBlock[] blocks = nn.open(file.getPath());
     res.totalBlocks += blocks.length;
     if (showFiles) {
       out.print(file.getPath() + " " + file.getLen() + ", " + blocks.length + " block(s): ");
@@ -416,10 +415,6 @@ public class NamenodeFsck {
       if (colon >= 0) {
         nodename = nodename.substring(0, colon);
       }
-      if (dfs.localName.equals(nodename)) {
-        chosenNode = nodes[i];
-        break;
-      }
     }
     if (chosenNode == null) {
       do  {

+ 0 - 1
src/java/org/apache/hadoop/ipc/RPC.java

@@ -32,7 +32,6 @@ import java.io.*;
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.conf.*;
 
 /** A simple RPC mechanism.

+ 29 - 2
src/java/org/apache/hadoop/ipc/Server.java

@@ -80,15 +80,39 @@ public abstract class Server {
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.Server");
 
-  private static final ThreadLocal SERVER = new ThreadLocal();
+  private static final ThreadLocal<Server> SERVER = new ThreadLocal<Server>();
 
   /** Returns the server instance called under or null.  May be called under
    * {@link #call(Writable)} implementations, and under {@link Writable}
    * methods of paramters and return values.  Permits applications to access
    * the server context.*/
   public static Server get() {
-    return (Server)SERVER.get();
+    return SERVER.get();
   }
+ 
+  /** This is set to Call object before Handler invokes an RPC and reset
+   * after the call returns.
+   */
+  private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
+  
+  /** Returns the remote side ip address when invoked inside an RPC 
+   *  Returns null incase of an error.
+   */
+  public static InetAddress getRemoteIp() {
+    Call call = CurCall.get();
+    if ( call != null ) {
+      return call.connection.socket.getInetAddress();
+    }
+    return null;
+  }
+  /** Returns remote address as a string when invoked inside an RPC.
+   *  Returns null in case of an error.
+   */
+  public static String getRemoteAddress() {
+    InetAddress addr = getRemoteIp();
+    return ( addr == null ) ? null : addr.getHostAddress();
+  }
+  
   private String bindAddress; 
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
@@ -529,6 +553,8 @@ public abstract class Server {
           String errorClass = null;
           String error = null;
           Writable value = null;
+          
+          CurCall.set( call );
           try {
             value = call(call.param);             // make the call
           } catch (Throwable e) {
@@ -536,6 +562,7 @@ public abstract class Server {
             errorClass = e.getClass().getName();
             error = StringUtils.stringifyException(e);
           }
+          CurCall.set( null );
             
           DataOutputStream out = call.connection.out;
           synchronized (out) {

+ 1 - 1
src/java/org/apache/hadoop/net/DNS.java

@@ -168,7 +168,7 @@ public class DNS {
     if (strInterface.equals("default")) 
       return InetAddress.getLocalHost().getCanonicalHostName();
 
-    if (nameserver.equals("default"))
+    if (nameserver != null && nameserver.equals("default"))
       return getDefaultHost(strInterface);
 
     String[] hosts = getHosts(strInterface, nameserver);

+ 1 - 2
src/test/org/apache/hadoop/dfs/TestReplication.java

@@ -70,8 +70,7 @@ public class TestReplication extends TestCase {
               DataNode.createSocketAddr(conf.get("fs.default.name")), 
               conf);
            
-      LocatedBlock[] locations = namenode.open(
-              DNS.getDefaultHost("default"), name.toString());
+      LocatedBlock[] locations = namenode.open(name.toString());
       boolean isOnSameRack = true, isNotOnSameRack = true;
       for (int idx = 0; idx < locations.length; idx++) {
           DatanodeInfo[] datanodes = locations[idx].getLocations();

+ 3 - 6
src/webapps/datanode/browseBlock.jsp

@@ -68,8 +68,7 @@
     blockSize = Long.parseLong(blockSizeStr);
 
     DFSClient dfs = new DFSClient(jspHelper.nameNodeAddr, jspHelper.conf);
-    LocatedBlock[] blocks = dfs.namenode.open(
-        DNS.getDefaultHost("default"), filename);
+    LocatedBlock[] blocks = dfs.namenode.open(filename);
     //Add the various links for looking at the file contents
     //URL for downloading the full file
     String downloadUrl = "http://" + req.getServerName() + ":" +
@@ -231,8 +230,7 @@
     //determine data for the next link
     if (startOffset + chunkSizeToView >= blockSize) {
       //we have to go to the next block from this point onwards
-      LocatedBlock[] blocks = dfs.namenode.open(
-           DNS.getDefaultHost("default"), filename);
+      LocatedBlock[] blocks = dfs.namenode.open(filename);
       for (int i = 0; i < blocks.length; i++) {
         if (blocks[i].getBlock().getBlockId() == blockId) {
           if (i != blocks.length - 1) {
@@ -279,8 +277,7 @@
     int prevPort = req.getServerPort();
     int prevDatanodePort = datanodePort;
     if (startOffset == 0) {
-      LocatedBlock [] blocks = dfs.namenode.open(
-              DNS.getDefaultHost("default"), filename);
+      LocatedBlock [] blocks = dfs.namenode.open(filename);
       for (int i = 0; i < blocks.length; i++) {
         if (blocks[i].getBlock().getBlockId() == blockId) {
           if (i != 0) {

+ 2 - 4
src/webapps/datanode/browseDirectory.jsp

@@ -33,8 +33,7 @@
     DFSClient dfs = new DFSClient(jspHelper.nameNodeAddr, jspHelper.conf);
     UTF8 target = new UTF8(dir);
     if( !dfs.isDirectory(target) ) { // a file
-      LocatedBlock[] blocks = dfs.namenode.open(
-          DNS.getDefaultHost("default"), dir);
+      LocatedBlock[] blocks = dfs.namenode.open(dir);
       DatanodeInfo [] locations = blocks[0].getLocations();
       if (locations.length == 0) {
         out.print("Empty file");
@@ -88,8 +87,7 @@
       //Get the location of the first block of the file
       if (files[i].getPath().endsWith(".crc")) continue;
       if (!files[i].isDir()) {
-        LocatedBlock[] blocks = dfs.namenode.open(
-            DNS.getDefaultHost("default"), files[i].getPath());
+        LocatedBlock[] blocks = dfs.namenode.open(files[i].getPath());
 
         DatanodeInfo [] locations = blocks[0].getLocations();
         if (locations.length == 0) {

+ 1 - 2
src/webapps/datanode/tail.jsp

@@ -55,8 +55,7 @@
     //fetch the block from the datanode that has the last block for this file
     DFSClient dfs = new DFSClient(jspHelper.nameNodeAddr, 
                                          jspHelper.conf);
-    LocatedBlock blocks[] = dfs.namenode.open(
-        DNS.getDefaultHost("default"), filename); 
+    LocatedBlock blocks[] = dfs.namenode.open(filename); 
     if (blocks == null || blocks.length == 0) {
       out.print("No datanodes contain blocks of file "+filename);
       dfs.close();

+ 22 - 12
src/webapps/dfs/dfshealth.jsp

@@ -50,24 +50,34 @@
   public void generateNodeData( JspWriter out, DatanodeDescriptor d,
                                     String suffix, boolean alive )
     throws IOException {
-    
-    String url = d.getName();
-    if ( url.indexOf( ':' ) >= 0 )
-        url = url.substring( 0, url.indexOf( ':' ) );
+      
+    /* Say the datanode is dn1.hadoop.apache.org with ip 192.168.0.5
+       we use:
+       1) d.getHostName():d.getPort() to display.
+           Domain and port are stripped if they are common across the nodes.
+           i.e. "dn1"
+       2) d.getHost():d.Port() for "title".
+          i.e. "192.168.0.5:50010"
+       3) d.getHostName():d.getInfoPort() for url.
+          i.e. "http://dn1.hadoop.apache.org:50075/..."
+          Note that "d.getHost():d.getPort()" is what DFS clients use
+          to interact with datanodes.
+    */
     // from nn_browsedfscontent.jsp:
-    url = "http://" + url + ":" + d.getInfoPort() +
-          "/browseDirectory.jsp?namenodeInfoPort=" +
-          fsn.getNameNodeInfoPort() + "&dir=" +
-          URLEncoder.encode("/", "UTF-8");
-    
-    String name = d.getName();
+    String url = "http://" + d.getHostName() + ":" + d.getInfoPort() +
+                 "/browseDirectory.jsp?namenodeInfoPort=" +
+                 fsn.getNameNodeInfoPort() + "&dir=" +
+                 URLEncoder.encode("/", "UTF-8");
+     
+    String name = d.getHostName() + ":" + d.getPort();
     if ( !name.matches( "\\d+\\.\\d+.\\d+\\.\\d+.*" ) ) 
         name = name.replaceAll( "\\.[^.:]*", "" );    
     int idx = (suffix != null && name.endsWith( suffix )) ?
         name.indexOf( suffix ) : -1;
     
-    out.print( rowTxt() + "<td class=\"name\"><a title=\"" + d.getName() +
-               "\"href=\"" + url + "\">" +
+    out.print( rowTxt() + "<td class=\"name\"><a title=\""
+               + d.getHost() + ":" + d.getPort() +
+               "\" href=\"" + url + "\">" +
                (( idx > 0 ) ? name.substring(0, idx) : name) + "</a>" +
                (( alive ) ? "" : "\n") );
     if ( !alive )