Bladeren bron

HDFS-5450. better API for getting the cached blocks locations. Contributed by Andrew Wang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1541338 13f79535-47bb-0310-9956-ffa450edef68
Andrew Wang 11 jaren geleden
bovenliggende
commit
9d06631719

+ 66 - 35
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BlockLocation.java

@@ -31,17 +31,33 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceStability.Stable
 public class BlockLocation {
   private String[] hosts; // Datanode hostnames
+  private String[] cachedHosts; // Datanode hostnames with a cached replica
   private String[] names; // Datanode IP:xferPort for accessing the block
   private String[] topologyPaths; // Full path name in network topology
   private long offset;  // Offset of the block in the file
   private long length;
   private boolean corrupt;
 
+  private static final String[] EMPTY_STR_ARRAY = new String[0];
+
   /**
    * Default Constructor
    */
   public BlockLocation() {
-    this(new String[0], new String[0],  0L, 0L);
+    this(EMPTY_STR_ARRAY, EMPTY_STR_ARRAY, 0L, 0L);
+  }
+
+  /**
+   * Copy constructor
+   */
+  public BlockLocation(BlockLocation that) {
+    this.hosts = that.hosts;
+    this.cachedHosts = that.cachedHosts;
+    this.names = that.names;
+    this.topologyPaths = that.topologyPaths;
+    this.offset = that.offset;
+    this.length = that.length;
+    this.corrupt = that.corrupt;
   }
 
   /**
@@ -57,20 +73,7 @@ public class BlockLocation {
    */
   public BlockLocation(String[] names, String[] hosts, long offset, 
                        long length, boolean corrupt) {
-    if (names == null) {
-      this.names = new String[0];
-    } else {
-      this.names = names;
-    }
-    if (hosts == null) {
-      this.hosts = new String[0];
-    } else {
-      this.hosts = hosts;
-    }
-    this.offset = offset;
-    this.length = length;
-    this.topologyPaths = new String[0];
-    this.corrupt = corrupt;
+    this(names, hosts, null, offset, length, corrupt);
   }
 
   /**
@@ -87,34 +90,55 @@ public class BlockLocation {
    */
   public BlockLocation(String[] names, String[] hosts, String[] topologyPaths,
                        long offset, long length, boolean corrupt) {
-    this(names, hosts, offset, length, corrupt);
+    this(names, hosts, null, topologyPaths, offset, length, corrupt);
+  }
+
+  public BlockLocation(String[] names, String[] hosts, String[] cachedHosts,
+      String[] topologyPaths, long offset, long length, boolean corrupt) {
+    if (names == null) {
+      this.names = EMPTY_STR_ARRAY;
+    } else {
+      this.names = names;
+    }
+    if (hosts == null) {
+      this.hosts = EMPTY_STR_ARRAY;
+    } else {
+      this.hosts = hosts;
+    }
+    if (cachedHosts == null) {
+      this.cachedHosts = EMPTY_STR_ARRAY;
+    } else {
+      this.cachedHosts = cachedHosts;
+    }
     if (topologyPaths == null) {
-      this.topologyPaths = new String[0];
+      this.topologyPaths = EMPTY_STR_ARRAY;
     } else {
       this.topologyPaths = topologyPaths;
     }
+    this.offset = offset;
+    this.length = length;
+    this.corrupt = corrupt;
   }
 
   /**
    * Get the list of hosts (hostname) hosting this block
    */
   public String[] getHosts() throws IOException {
-    if (hosts == null || hosts.length == 0) {
-      return new String[0];
-    } else {
-      return hosts;
-    }
+    return hosts;
+  }
+
+  /**
+   * Get the list of hosts (hostname) hosting a cached replica of the block
+   */
+  public String[] getCachedHosts() {
+   return cachedHosts;
   }
 
   /**
    * Get the list of names (IP:xferPort) hosting this block
    */
   public String[] getNames() throws IOException {
-    if (names == null || names.length == 0) {
-      return new String[0];
-    } else {
-      return names;
-    }
+    return names;
   }
 
   /**
@@ -122,11 +146,7 @@ public class BlockLocation {
    * The last component of the path is the "name" (IP:xferPort).
    */
   public String[] getTopologyPaths() throws IOException {
-    if (topologyPaths == null || topologyPaths.length == 0) {
-      return new String[0];
-    } else {
-      return topologyPaths;
-    }
+    return topologyPaths;
   }
   
   /**
@@ -176,18 +196,29 @@ public class BlockLocation {
    */
   public void setHosts(String[] hosts) throws IOException {
     if (hosts == null) {
-      this.hosts = new String[0];
+      this.hosts = EMPTY_STR_ARRAY;
     } else {
       this.hosts = hosts;
     }
   }
 
+  /**
+   * Set the hosts hosting a cached replica of this block
+   */
+  public void setCachedHosts(String[] cachedHosts) {
+    if (cachedHosts == null) {
+      this.cachedHosts = EMPTY_STR_ARRAY;
+    } else {
+      this.cachedHosts = cachedHosts;
+    }
+  }
+
   /**
    * Set the names (host:port) hosting this block
    */
   public void setNames(String[] names) throws IOException {
     if (names == null) {
-      this.names = new String[0];
+      this.names = EMPTY_STR_ARRAY;
     } else {
       this.names = names;
     }
@@ -198,7 +229,7 @@ public class BlockLocation {
    */
   public void setTopologyPaths(String[] topologyPaths) throws IOException {
     if (topologyPaths == null) {
-      this.topologyPaths = new String[0];
+      this.topologyPaths = EMPTY_STR_ARRAY;
     } else {
       this.topologyPaths = topologyPaths;
     }

+ 108 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestBlockLocation.java

@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.junit.Test;
+
+public class TestBlockLocation {
+
+  private static final String[] EMPTY_STR_ARRAY = new String[0];
+
+  private static void checkBlockLocation(final BlockLocation loc)
+      throws Exception {
+    checkBlockLocation(loc, 0, 0, false);
+  }
+
+  private static void checkBlockLocation(final BlockLocation loc,
+      final long offset, final long length, final boolean corrupt)
+      throws Exception {
+    checkBlockLocation(loc, EMPTY_STR_ARRAY, EMPTY_STR_ARRAY, EMPTY_STR_ARRAY,
+        EMPTY_STR_ARRAY, offset, length, corrupt);
+  }
+
+  private static void checkBlockLocation(final BlockLocation loc,
+      String[] names, String[] hosts, String[] cachedHosts,
+      String[] topologyPaths, final long offset, final long length,
+      final boolean corrupt) throws Exception {
+    assertNotNull(loc.getHosts());
+    assertNotNull(loc.getCachedHosts());
+    assertNotNull(loc.getNames());
+    assertNotNull(loc.getTopologyPaths());
+
+    assertArrayEquals(hosts, loc.getHosts());
+    assertArrayEquals(cachedHosts, loc.getCachedHosts());
+    assertArrayEquals(names, loc.getNames());
+    assertArrayEquals(topologyPaths, loc.getTopologyPaths());
+
+    assertEquals(offset, loc.getOffset());
+    assertEquals(length, loc.getLength());
+    assertEquals(corrupt, loc.isCorrupt());
+  }
+
+  /**
+   * Call all the constructors and verify the delegation is working properly
+   */
+  @Test(timeout = 5000)
+  public void testBlockLocationConstructors() throws Exception {
+    //
+    BlockLocation loc;
+    loc = new BlockLocation();
+    checkBlockLocation(loc);
+    loc = new BlockLocation(null, null, 1, 2);
+    checkBlockLocation(loc, 1, 2, false);
+    loc = new BlockLocation(null, null, null, 1, 2);
+    checkBlockLocation(loc, 1, 2, false);
+    loc = new BlockLocation(null, null, null, 1, 2, true);
+    checkBlockLocation(loc, 1, 2, true);
+    loc = new BlockLocation(null, null, null, null, 1, 2, true);
+    checkBlockLocation(loc, 1, 2, true);
+  }
+
+  /**
+   * Call each of the setters and verify
+   */
+  @Test(timeout = 5000)
+  public void testBlockLocationSetters() throws Exception {
+    BlockLocation loc;
+    loc = new BlockLocation();
+    // Test that null sets the empty array
+    loc.setHosts(null);
+    loc.setCachedHosts(null);
+    loc.setNames(null);
+    loc.setTopologyPaths(null);
+    checkBlockLocation(loc);
+    // Test that not-null gets set properly
+    String[] names = new String[] { "name" };
+    String[] hosts = new String[] { "host" };
+    String[] cachedHosts = new String[] { "cachedHost" };
+    String[] topologyPaths = new String[] { "path" };
+    loc.setNames(names);
+    loc.setHosts(hosts);
+    loc.setCachedHosts(cachedHosts);
+    loc.setTopologyPaths(topologyPaths);
+    loc.setOffset(1);
+    loc.setLength(2);
+    loc.setCorrupt(true);
+    checkBlockLocation(loc, names, hosts, cachedHosts, topologyPaths, 1, 2,
+        true);
+  }
+}

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -190,6 +190,8 @@ Trunk (Unreleased)
 
     HDFS-5326. add modifyDirective to cacheAdmin.  (cmccabe)
 
+    HDFS-5450. Better API for getting the cached blocks locations. (wang)
+
   OPTIMIZATIONS
     HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
 

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java

@@ -37,8 +37,7 @@ public class HdfsBlockLocation extends BlockLocation {
   public HdfsBlockLocation(BlockLocation loc, LocatedBlock block) 
       throws IOException {
     // Initialize with data from passed in BlockLocation
-    super(loc.getNames(), loc.getHosts(), loc.getTopologyPaths(), 
-        loc.getOffset(), loc.getLength(), loc.isCorrupt());
+    super(loc);
     this.block = block;
   }
   

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java

@@ -419,7 +419,13 @@ public class DFSUtil {
                                      locations[hCnt].getNetworkLocation());
         racks[hCnt] = node.toString();
       }
-      blkLocations[idx] = new BlockLocation(xferAddrs, hosts, racks,
+      DatanodeInfo[] cachedLocations = blk.getCachedLocations();
+      String[] cachedHosts = new String[cachedLocations.length];
+      for (int i=0; i<cachedLocations.length; i++) {
+        cachedHosts[i] = cachedLocations[i].getHostName();
+      }
+      blkLocations[idx] = new BlockLocation(xferAddrs, hosts, cachedHosts,
+                                            racks,
                                             blk.getStartOffset(),
                                             blk.getBlockSize(),
                                             blk.isCorrupt());

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java

@@ -393,6 +393,7 @@ public class JsonUtil {
     m.put("startOffset", locatedblock.getStartOffset());
     m.put("block", toJsonMap(locatedblock.getBlock()));
     m.put("locations", toJsonArray(locatedblock.getLocations()));
+    m.put("cachedLocations", toJsonArray(locatedblock.getCachedLocations()));
     return m;
   }
 
@@ -407,8 +408,11 @@ public class JsonUtil {
         (Object[])m.get("locations"));
     final long startOffset = (Long)m.get("startOffset");
     final boolean isCorrupt = (Boolean)m.get("isCorrupt");
+    final DatanodeInfo[] cachedLocations = toDatanodeInfoArray(
+        (Object[])m.get("cachedLocations"));
 
-    final LocatedBlock locatedblock = new LocatedBlock(b, locations, startOffset, isCorrupt);
+    final LocatedBlock locatedblock = new LocatedBlock(b, locations,
+        startOffset, isCorrupt, cachedLocations);
     locatedblock.setBlockToken(toBlockToken((Map<?, ?>)m.get("blockToken")));
     return locatedblock;
   }

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java

@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock.Mlocker;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
@@ -87,6 +88,8 @@ public class TestFsDatasetCache {
   private static DatanodeProtocolClientSideTranslatorPB spyNN;
   private static PageRounder rounder = new PageRounder();
 
+  private Mlocker mlocker;
+
   @Before
   public void setUp() throws Exception {
     assumeTrue(!Path.WINDOWS);
@@ -110,6 +113,8 @@ public class TestFsDatasetCache {
     fsd = dn.getFSDataset();
 
     spyNN = DataNodeTestUtils.spyOnBposToNN(dn, nn);
+    // Save the current mlocker and replace it at the end of the test
+    mlocker = MappableBlock.mlocker;
   }
 
   @After
@@ -120,6 +125,8 @@ public class TestFsDatasetCache {
     if (cluster != null) {
       cluster.shutdown();
     }
+    // Restore the original mlocker
+    MappableBlock.mlocker = mlocker;
   }
 
   private static void setHeartbeatResponse(DatanodeCommand[] cmds)

+ 124 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java

@@ -31,6 +31,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.nio.MappedByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -40,6 +41,8 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.InvalidRequestException;
@@ -54,6 +57,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.security.AccessControlException;
@@ -78,6 +82,15 @@ public class TestPathBasedCacheRequests {
   static private DistributedFileSystem dfs;
   static private NamenodeProtocols proto;
 
+  static {
+    MappableBlock.mlocker = new MappableBlock.Mlocker() {
+      @Override
+      public void mlock(MappedByteBuffer mmap, long length) throws IOException {
+        // Stubbed out for testing
+      }
+    };
+  }
+
   @Before
   public void setup() throws Exception {
     conf = new HdfsConfiguration();
@@ -530,6 +543,14 @@ public class TestPathBasedCacheRequests {
     assertFalse("Unexpected # of cache directives found", dit.hasNext());
   }
 
+  /**
+   * Wait for the NameNode to have an expected number of cached blocks
+   * and replicas.
+   * @param nn NameNode
+   * @param expectedCachedBlocks
+   * @param expectedCachedReplicas
+   * @throws Exception
+   */
   private static void waitForCachedBlocks(NameNode nn,
       final int expectedCachedBlocks, final int expectedCachedReplicas) 
           throws Exception {
@@ -570,6 +591,37 @@ public class TestPathBasedCacheRequests {
     }, 500, 60000);
   }
 
+  private static void checkNumCachedReplicas(final DistributedFileSystem dfs,
+      final List<Path> paths, final int expectedBlocks,
+      final int expectedReplicas)
+      throws Exception {
+    int numCachedBlocks = 0;
+    int numCachedReplicas = 0;
+    for (Path p: paths) {
+      final FileStatus f = dfs.getFileStatus(p);
+      final long len = f.getLen();
+      final long blockSize = f.getBlockSize();
+      // round it up to full blocks
+      final long numBlocks = (len + blockSize - 1) / blockSize;
+      BlockLocation[] locs = dfs.getFileBlockLocations(p, 0, len);
+      assertEquals("Unexpected number of block locations for path " + p,
+          numBlocks, locs.length);
+      for (BlockLocation l: locs) {
+        if (l.getCachedHosts().length > 0) {
+          numCachedBlocks++;
+        }
+        numCachedReplicas += l.getCachedHosts().length;
+      }
+    }
+    LOG.info("Found " + numCachedBlocks + " of " + expectedBlocks + " blocks");
+    LOG.info("Found " + numCachedReplicas + " of " + expectedReplicas
+        + " replicas");
+    assertEquals("Unexpected number of cached blocks", expectedBlocks,
+        numCachedBlocks);
+    assertEquals("Unexpected number of cached replicas", expectedReplicas,
+        numCachedReplicas);
+  }
+
   private static final long BLOCK_SIZE = 512;
   private static final int NUM_DATANODES = 4;
 
@@ -746,6 +798,78 @@ public class TestPathBasedCacheRequests {
     }
   }
 
+  /**
+   * Tests stepping the cache replication factor up and down, checking the
+   * number of cached replicas and blocks as well as the advertised locations.
+   * @throws Exception
+   */
+  @Test(timeout=120000)
+  public void testReplicationFactor() throws Exception {
+    Assume.assumeTrue(canTestDatanodeCaching());
+    HdfsConfiguration conf = createCachingConf();
+    MiniDFSCluster cluster =
+      new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+    try {
+      cluster.waitActive();
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      NameNode namenode = cluster.getNameNode();
+      // Create the pool
+      final String pool = "friendlyPool";
+      dfs.addCachePool(new CachePoolInfo(pool));
+      // Create some test files
+      final List<Path> paths = new LinkedList<Path>();
+      paths.add(new Path("/foo/bar"));
+      paths.add(new Path("/foo/baz"));
+      paths.add(new Path("/foo2/bar2"));
+      paths.add(new Path("/foo2/baz2"));
+      dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
+      dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
+      final int numBlocksPerFile = 2;
+      for (Path path : paths) {
+        FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
+            (int)BLOCK_SIZE, (short)3, false);
+      }
+      waitForCachedBlocks(namenode, 0, 0);
+      checkNumCachedReplicas(dfs, paths, 0, 0);
+      // cache directory
+      long id = dfs.addPathBasedCacheDirective(
+          new PathBasedCacheDirective.Builder().
+            setPath(new Path("/foo")).
+            setReplication((short)1).
+            setPool(pool).
+            build());
+      waitForCachedBlocks(namenode, 4, 4);
+      checkNumCachedReplicas(dfs, paths, 4, 4);
+      // step up the replication factor
+      for (int i=2; i<=3; i++) {
+        dfs.modifyPathBasedCacheDirective(
+            new PathBasedCacheDirective.Builder().
+            setId(id).
+            setReplication((short)i).
+            build());
+        waitForCachedBlocks(namenode, 4, 4*i);
+        checkNumCachedReplicas(dfs, paths, 4, 4*i);
+      }
+      // step it down
+      for (int i=2; i>=1; i--) {
+        dfs.modifyPathBasedCacheDirective(
+            new PathBasedCacheDirective.Builder().
+            setId(id).
+            setReplication((short)i).
+            build());
+        waitForCachedBlocks(namenode, 4, 4*i);
+        checkNumCachedReplicas(dfs, paths, 4, 4*i);
+      }
+      // remove and watch numCached go to 0
+      dfs.removePathBasedCacheDirective(id);
+      waitForCachedBlocks(namenode, 0, 0);
+      checkNumCachedReplicas(dfs, paths, 0, 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   @Test(timeout=60000)
   public void testListCachePoolPermissions() throws Exception {
     final UserGroupInformation myUser = UserGroupInformation