Browse Source

svn merge -c 1354773 from branch-1 for HDFS-3551.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.1@1354774 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 13 years ago
parent
commit
6a350ca60c

+ 3 - 0
CHANGES.txt

@@ -259,6 +259,9 @@ Release 1.1.0 - unreleased
     HDFS-3522. If a namenode is in safemode, it should throw SafeModeException
     HDFS-3522. If a namenode is in safemode, it should throw SafeModeException
     when getBlockLocations has zero locations.  (Brandon Li via szetszwo)
     when getBlockLocations has zero locations.  (Brandon Li via szetszwo)
 
 
+    HDFS-3551. WebHDFS CREATE should use client location for HTTP redirection.
+    (szetszwo)
+
 Release 1.0.3 - 2012.05.07
 Release 1.0.3 - 2012.05.07
 
 
   NEW FEATURES
   NEW FEATURES

+ 19 - 0
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -3100,6 +3100,20 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
     return true;
     return true;
   }
   }
 
 
+  /** Choose a datanode near to the given address. */ 
+  public DatanodeInfo chooseDatanode(String address, long blocksize) {
+    final DatanodeDescriptor clientNode = host2DataNodeMap.getDatanodeByHost(
+        address);
+    if (clientNode != null) {
+      final DatanodeDescriptor[] datanodes = replicator.chooseTarget(
+          1, clientNode, null, blocksize);
+      if (datanodes.length > 0) {
+        return datanodes[0];
+      }
+    }
+    return null;
+  }
+
   /**
   /**
    * Parse the data-nodes the block belongs to and choose one,
    * Parse the data-nodes the block belongs to and choose one,
    * which will be the replication source.
    * which will be the replication source.
@@ -5929,4 +5943,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
       } 
       } 
     }
     }
   }
   }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + ": " + host2DataNodeMap;
+  }
 }
 }

+ 10 - 0
src/hdfs/org/apache/hadoop/hdfs/server/namenode/Host2NodesMap.java

@@ -185,4 +185,14 @@ class Host2NodesMap {
       hostmapLock.readLock().unlock();
       hostmapLock.readLock().unlock();
     }
     }
   }
   }
+  
+  @Override
+  public String toString() {
+    final StringBuilder b = new StringBuilder(getClass().getSimpleName())
+        .append("[");
+    for(Map.Entry<String, DatanodeDescriptor[]> e : map.entrySet()) {
+      b.append("\n  " + e.getKey() + " => " + Arrays.asList(e.getValue()));
+    }
+    return b.append("\n]").toString();
+  }
 }
 }

+ 27 - 10
src/hdfs/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java

@@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.JspHelper;
 import org.apache.hadoop.hdfs.server.namenode.JspHelper;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.web.JsonUtil;
 import org.apache.hadoop.hdfs.web.JsonUtil;
@@ -107,6 +108,11 @@ public class NamenodeWebHdfsMethods {
     return REMOTE_ADDRESS.get();
     return REMOTE_ADDRESS.get();
   }
   }
 
 
+  /** Set the remote client address. */
+  static void setRemoteAddress(String remoteAddress) {
+    REMOTE_ADDRESS.set(remoteAddress);
+  }
+
   private @Context ServletContext context;
   private @Context ServletContext context;
   private @Context HttpServletRequest request;
   private @Context HttpServletRequest request;
   private @Context HttpServletResponse response;
   private @Context HttpServletResponse response;
@@ -126,12 +132,21 @@ public class NamenodeWebHdfsMethods {
     response.setContentType(null);
     response.setContentType(null);
   }
   }
 
 
-  private static DatanodeInfo chooseDatanode(final NameNode namenode,
-      final String path, final HttpOpParam.Op op, final long openOffset
-      ) throws IOException {
-    if (op == GetOpParam.Op.OPEN
+  static DatanodeInfo chooseDatanode(final NameNode namenode,
+      final String path, final HttpOpParam.Op op, final long openOffset,
+      final long blocksize) throws IOException {
+    final FSNamesystem ns = namenode.getNamesystem();
+
+    if (op == PutOpParam.Op.CREATE) {
+      //choose a datanode near to client
+      final DatanodeInfo dn = ns.chooseDatanode(getRemoteAddress(), blocksize);
+      if (dn != null) {
+        return dn;
+      }
+    } else if (op == GetOpParam.Op.OPEN
         || op == GetOpParam.Op.GETFILECHECKSUM
         || op == GetOpParam.Op.GETFILECHECKSUM
         || op == PostOpParam.Op.APPEND) {
         || op == PostOpParam.Op.APPEND) {
+      //choose a datanode containing a replica
       final HdfsFileStatus status = namenode.getFileInfo(path);
       final HdfsFileStatus status = namenode.getFileInfo(path);
       if (status == null) {
       if (status == null) {
         throw new FileNotFoundException("File " + path + " not found.");
         throw new FileNotFoundException("File " + path + " not found.");
@@ -155,7 +170,7 @@ public class NamenodeWebHdfsMethods {
       }
       }
     } 
     } 
 
 
-    return namenode.getNamesystem().getRandomDatanode();
+    return ns.getRandomDatanode();
   }
   }
 
 
   private Token<? extends TokenIdentifier> generateDelegationToken(
   private Token<? extends TokenIdentifier> generateDelegationToken(
@@ -173,8 +188,10 @@ public class NamenodeWebHdfsMethods {
       final UserGroupInformation ugi, final DelegationParam delegation,
       final UserGroupInformation ugi, final DelegationParam delegation,
       final UserParam username, final DoAsParam doAsUser,
       final UserParam username, final DoAsParam doAsUser,
       final String path, final HttpOpParam.Op op, final long openOffset,
       final String path, final HttpOpParam.Op op, final long openOffset,
+      final long blocksize,
       final Param<?, ?>... parameters) throws URISyntaxException, IOException {
       final Param<?, ?>... parameters) throws URISyntaxException, IOException {
-    final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset);
+    final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset,
+        blocksize);
 
 
     final String delegationQuery;
     final String delegationQuery;
     if (!UserGroupInformation.isSecurityEnabled()) {
     if (!UserGroupInformation.isSecurityEnabled()) {
@@ -302,7 +319,7 @@ public class NamenodeWebHdfsMethods {
     case CREATE:
     case CREATE:
     {
     {
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), -1L,
+          fullpath, op.getValue(), -1L, blockSize.getValue(conf),
           permission, overwrite, bufferSize, replication, blockSize);
           permission, overwrite, bufferSize, replication, blockSize);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     } 
     } 
@@ -425,7 +442,7 @@ public class NamenodeWebHdfsMethods {
     case APPEND:
     case APPEND:
     {
     {
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), -1L, bufferSize);
+          fullpath, op.getValue(), -1L, -1L, bufferSize);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     }
     default:
     default:
@@ -507,7 +524,7 @@ public class NamenodeWebHdfsMethods {
     case OPEN:
     case OPEN:
     {
     {
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), offset.getValue(), offset, length, bufferSize);
+          fullpath, op.getValue(), offset.getValue(), -1L, offset, length, bufferSize);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     }
     case GET_BLOCK_LOCATIONS:
     case GET_BLOCK_LOCATIONS:
@@ -543,7 +560,7 @@ public class NamenodeWebHdfsMethods {
     case GETFILECHECKSUM:
     case GETFILECHECKSUM:
     {
     {
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), -1L);
+          fullpath, op.getValue(), -1L, -1L);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     }
     case GETDELEGATIONTOKEN:
     case GETDELEGATIONTOKEN:

+ 11 - 0
src/test/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java

@@ -17,9 +17,20 @@
  */
  */
 package org.apache.hadoop.hdfs.server.namenode;
 package org.apache.hadoop.hdfs.server.namenode;
 import java.io.IOException;
 import java.io.IOException;
+
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 
 
 public abstract class NameNodeAdapter {
 public abstract class NameNodeAdapter {
+  /**
+   * Get block locations within the specified range.
+   */
+  public static LocatedBlocks getBlockLocations(NameNode namenode,
+      String src, long offset, long length) throws IOException {
+    return namenode.getNamesystem().getBlockLocations(
+        src, offset, length, false, true, true);
+  }
+
   public static boolean checkFileProgress(FSNamesystem fsn, String path, boolean checkall) throws IOException {
   public static boolean checkFileProgress(FSNamesystem fsn, String path, boolean checkall) throws IOException {
     INodeFile f = fsn.dir.getFileINode(path);
     INodeFile f = fsn.dir.getFileINode(path);
     return fsn.checkFileProgress(f, checkall);
     return fsn.checkFileProgress(f, checkall);

+ 136 - 0
src/test/org/apache/hadoop/hdfs/server/namenode/web/resources/TestWebHdfsDataLocality.java

@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.web.resources;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
+import org.apache.hadoop.hdfs.web.resources.GetOpParam;
+import org.apache.hadoop.hdfs.web.resources.PostOpParam;
+import org.apache.hadoop.hdfs.web.resources.PutOpParam;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test WebHDFS which provides data locality using HTTP redirection.
+ */
+public class TestWebHdfsDataLocality {
+  static final Log LOG = LogFactory.getLog(TestWebHdfsDataLocality.class);
+  {
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.OFF);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.OFF);
+    ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.OFF);
+  }
+  
+  private static final String RACK0 = "/rack0";
+  private static final String RACK1 = "/rack1";
+  private static final String RACK2 = "/rack2";
+
+  @Test
+  public void testDataLocality() throws Exception {
+    final Configuration conf = WebHdfsTestUtil.createConf();
+    final String[] racks = {RACK0, RACK0, RACK1, RACK1, RACK2, RACK2};
+    final int nDataNodes = racks.length;
+    LOG.info("nDataNodes=" + nDataNodes + ", racks=" + Arrays.asList(racks));
+
+    final MiniDFSCluster cluster = new MiniDFSCluster(
+        conf, nDataNodes, true, racks);
+    try {
+      cluster.waitActive();
+
+      final DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+      final NameNode namenode = cluster.getNameNode();
+      final FSNamesystem ns = namenode.getNamesystem();
+      LOG.info("ns=" + ns);
+  
+      final long blocksize = DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
+      final String f = "/foo";
+
+      { //test CREATE
+        for(int i = 0; i < nDataNodes; i++) {
+          //set client address to a particular datanode
+          final DataNode dn = cluster.getDataNodes().get(i);
+          final String host = ns.getDatanode(dn.dnRegistration).getHost();
+          NamenodeWebHdfsMethods.setRemoteAddress(host);
+
+          //The chosen datanode must be the same as the client address
+          final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
+              namenode, f, PutOpParam.Op.CREATE, -1L, blocksize);
+          Assert.assertEquals(host, chosen.getHost());
+        }
+      }
+  
+      //create a file with one replica.
+      final Path p = new Path(f);
+      final FSDataOutputStream out = dfs.create(p, (short)1);
+      out.write(1);
+      out.close();
+  
+      //get replica location.
+      final LocatedBlocks locatedblocks = NameNodeAdapter.getBlockLocations(
+          namenode, f, 0, 1);
+      final List<LocatedBlock> lb = locatedblocks.getLocatedBlocks();
+      Assert.assertEquals(1, lb.size());
+      final DatanodeInfo[] locations = lb.get(0).getLocations();
+      Assert.assertEquals(1, locations.length);
+      final DatanodeInfo expected = locations[0];
+      
+      //For GETFILECHECKSUM, OPEN and APPEND,
+      //the chosen datanode must be the same as the replica location.
+
+      { //test GETFILECHECKSUM
+        final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
+            namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize);
+        Assert.assertEquals(expected, chosen);
+      }
+  
+      { //test OPEN
+        final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
+            namenode, f, GetOpParam.Op.OPEN, 0, blocksize);
+        Assert.assertEquals(expected, chosen);
+      }
+
+      { //test APPEND
+        final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
+            namenode, f, PostOpParam.Op.APPEND, -1L, blocksize);
+        Assert.assertEquals(expected, chosen);
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}

+ 7 - 0
src/test/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
 import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
 import org.apache.hadoop.hdfs.web.resources.Param;
 import org.apache.hadoop.hdfs.web.resources.Param;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -39,6 +40,12 @@ import org.junit.Assert;
 public class WebHdfsTestUtil {
 public class WebHdfsTestUtil {
   public static final Log LOG = LogFactory.getLog(WebHdfsTestUtil.class);
   public static final Log LOG = LogFactory.getLog(WebHdfsTestUtil.class);
 
 
+  public static Configuration createConf() {
+    final Configuration conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
+    return conf;
+  }
+
   public static WebHdfsFileSystem getWebHdfsFileSystem(final Configuration conf
   public static WebHdfsFileSystem getWebHdfsFileSystem(final Configuration conf
       ) throws IOException, URISyntaxException {
       ) throws IOException, URISyntaxException {
     final String uri = WebHdfsFileSystem.SCHEME  + "://"
     final String uri = WebHdfsFileSystem.SCHEME  + "://"