|
@@ -0,0 +1,743 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.hdfs.server.namenode;
|
|
|
+
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
|
|
|
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
|
|
|
+import static junit.framework.Assert.assertTrue;
|
|
|
+import static junit.framework.Assert.fail;
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertFalse;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.LinkedList;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+import junit.framework.Assert;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.FileSystemTestHelper;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.RemoteIterator;
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
+import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
|
+import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
|
+import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
|
+import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
+import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPathNameError;
|
|
|
+import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
|
|
|
+import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PoolWritePermissionDeniedError;
|
|
|
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
|
|
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
|
|
|
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
|
|
|
+import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.InvalidIdException;
|
|
|
+import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException;
|
|
|
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
|
|
|
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
|
|
+import org.apache.hadoop.io.nativeio.NativeIO;
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
|
+import org.apache.hadoop.util.GSet;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Assume;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
+import com.google.common.base.Supplier;
|
|
|
+
|
|
|
+public class TestPathBasedCacheRequests {
|
|
|
+ static final Log LOG = LogFactory.getLog(TestPathBasedCacheRequests.class);
|
|
|
+
|
|
|
+ private static final UserGroupInformation unprivilegedUser =
|
|
|
+ UserGroupInformation.createRemoteUser("unprivilegedUser");
|
|
|
+
|
|
|
+ static private Configuration conf;
|
|
|
+ static private MiniDFSCluster cluster;
|
|
|
+ static private DistributedFileSystem dfs;
|
|
|
+ static private NamenodeProtocols proto;
|
|
|
+
|
|
|
+ @Before
|
|
|
+ public void setup() throws Exception {
|
|
|
+ conf = new HdfsConfiguration();
|
|
|
+ // set low limits here for testing purposes
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 2);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES, 2);
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
|
|
+ cluster.waitActive();
|
|
|
+ dfs = cluster.getFileSystem();
|
|
|
+ proto = cluster.getNameNodeRpc();
|
|
|
+ }
|
|
|
+
|
|
|
+ @After
|
|
|
+ public void teardown() throws Exception {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout=60000)
|
|
|
+ public void testBasicPoolOperations() throws Exception {
|
|
|
+ final String poolName = "pool1";
|
|
|
+ CachePoolInfo info = new CachePoolInfo(poolName).
|
|
|
+ setOwnerName("bob").setGroupName("bobgroup").
|
|
|
+ setMode(new FsPermission((short)0755)).setWeight(150);
|
|
|
+
|
|
|
+ // Add a pool
|
|
|
+ dfs.addCachePool(info);
|
|
|
+
|
|
|
+ // Do some bad addCachePools
|
|
|
+ try {
|
|
|
+ dfs.addCachePool(info);
|
|
|
+ fail("added the pool with the same name twice");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("pool1 already exists", ioe);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ dfs.addCachePool(new CachePoolInfo(""));
|
|
|
+ fail("added empty pool");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
|
|
|
+ ioe);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ dfs.addCachePool(null);
|
|
|
+ fail("added null pool");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ proto.addCachePool(new CachePoolInfo(""));
|
|
|
+ fail("added empty pool");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
|
|
|
+ ioe);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ proto.addCachePool(null);
|
|
|
+ fail("added null pool");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Modify the pool
|
|
|
+ info.setOwnerName("jane").setGroupName("janegroup")
|
|
|
+ .setMode(new FsPermission((short)0700)).setWeight(314);
|
|
|
+ dfs.modifyCachePool(info);
|
|
|
+
|
|
|
+ // Do some invalid modify pools
|
|
|
+ try {
|
|
|
+ dfs.modifyCachePool(new CachePoolInfo("fool"));
|
|
|
+ fail("modified non-existent cache pool");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("fool does not exist", ioe);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ dfs.modifyCachePool(new CachePoolInfo(""));
|
|
|
+ fail("modified empty pool");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
|
|
|
+ ioe);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ dfs.modifyCachePool(null);
|
|
|
+ fail("modified null pool");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ proto.modifyCachePool(new CachePoolInfo(""));
|
|
|
+ fail("modified empty pool");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
|
|
|
+ ioe);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ proto.modifyCachePool(null);
|
|
|
+ fail("modified null pool");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Remove the pool
|
|
|
+ dfs.removeCachePool(poolName);
|
|
|
+ // Do some bad removePools
|
|
|
+ try {
|
|
|
+ dfs.removeCachePool("pool99");
|
|
|
+ fail("expected to get an exception when " +
|
|
|
+ "removing a non-existent pool.");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("can't remove " +
|
|
|
+ "non-existent cache pool", ioe);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ dfs.removeCachePool(poolName);
|
|
|
+ Assert.fail("expected to get an exception when " +
|
|
|
+ "removing a non-existent pool.");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("can't remove " +
|
|
|
+ "non-existent cache pool", ioe);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ dfs.removeCachePool("");
|
|
|
+ fail("removed empty pool");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
|
|
|
+ ioe);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ dfs.removeCachePool(null);
|
|
|
+ fail("removed null pool");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
|
|
|
+ ioe);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ proto.removeCachePool("");
|
|
|
+ fail("removed empty pool");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
|
|
|
+ ioe);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ proto.removeCachePool(null);
|
|
|
+ fail("removed null pool");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
|
|
|
+ ioe);
|
|
|
+ }
|
|
|
+
|
|
|
+ info = new CachePoolInfo("pool2");
|
|
|
+ dfs.addCachePool(info);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout=60000)
|
|
|
+ public void testCreateAndModifyPools() throws Exception {
|
|
|
+ String poolName = "pool1";
|
|
|
+ String ownerName = "abc";
|
|
|
+ String groupName = "123";
|
|
|
+ FsPermission mode = new FsPermission((short)0755);
|
|
|
+ int weight = 150;
|
|
|
+ dfs.addCachePool(new CachePoolInfo(poolName).
|
|
|
+ setOwnerName(ownerName).setGroupName(groupName).
|
|
|
+ setMode(mode).setWeight(weight));
|
|
|
+
|
|
|
+ RemoteIterator<CachePoolInfo> iter = dfs.listCachePools();
|
|
|
+ CachePoolInfo info = iter.next();
|
|
|
+ assertEquals(poolName, info.getPoolName());
|
|
|
+ assertEquals(ownerName, info.getOwnerName());
|
|
|
+ assertEquals(groupName, info.getGroupName());
|
|
|
+
|
|
|
+ ownerName = "def";
|
|
|
+ groupName = "456";
|
|
|
+ mode = new FsPermission((short)0700);
|
|
|
+ weight = 151;
|
|
|
+ dfs.modifyCachePool(new CachePoolInfo(poolName).
|
|
|
+ setOwnerName(ownerName).setGroupName(groupName).
|
|
|
+ setMode(mode).setWeight(weight));
|
|
|
+
|
|
|
+ iter = dfs.listCachePools();
|
|
|
+ info = iter.next();
|
|
|
+ assertEquals(poolName, info.getPoolName());
|
|
|
+ assertEquals(ownerName, info.getOwnerName());
|
|
|
+ assertEquals(groupName, info.getGroupName());
|
|
|
+ assertEquals(mode, info.getMode());
|
|
|
+ assertEquals(Integer.valueOf(weight), info.getWeight());
|
|
|
+
|
|
|
+ dfs.removeCachePool(poolName);
|
|
|
+ iter = dfs.listCachePools();
|
|
|
+ assertFalse("expected no cache pools after deleting pool", iter.hasNext());
|
|
|
+
|
|
|
+ proto.listCachePools(null);
|
|
|
+
|
|
|
+ try {
|
|
|
+ proto.removeCachePool("pool99");
|
|
|
+ Assert.fail("expected to get an exception when " +
|
|
|
+ "removing a non-existent pool.");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("can't remove non-existent",
|
|
|
+ ioe);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ proto.removeCachePool(poolName);
|
|
|
+ Assert.fail("expected to get an exception when " +
|
|
|
+ "removing a non-existent pool.");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ GenericTestUtils.assertExceptionContains("can't remove non-existent",
|
|
|
+ ioe);
|
|
|
+ }
|
|
|
+
|
|
|
+ iter = dfs.listCachePools();
|
|
|
+ assertFalse("expected no cache pools after deleting pool", iter.hasNext());
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void validateListAll(
|
|
|
+ RemoteIterator<PathBasedCacheDescriptor> iter,
|
|
|
+ PathBasedCacheDescriptor... descriptors) throws Exception {
|
|
|
+ for (PathBasedCacheDescriptor descriptor: descriptors) {
|
|
|
+ assertTrue("Unexpectedly few elements", iter.hasNext());
|
|
|
+ assertEquals("Unexpected descriptor", descriptor, iter.next());
|
|
|
+ }
|
|
|
+ assertFalse("Unexpectedly many list elements", iter.hasNext());
|
|
|
+ }
|
|
|
+
|
|
|
+ private static PathBasedCacheDescriptor addAsUnprivileged(
|
|
|
+ final PathBasedCacheDirective directive) throws Exception {
|
|
|
+ return unprivilegedUser
|
|
|
+ .doAs(new PrivilegedExceptionAction<PathBasedCacheDescriptor>() {
|
|
|
+ @Override
|
|
|
+ public PathBasedCacheDescriptor run() throws IOException {
|
|
|
+ DistributedFileSystem myDfs =
|
|
|
+ (DistributedFileSystem) FileSystem.get(conf);
|
|
|
+ return myDfs.addPathBasedCacheDirective(directive);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout=60000)
|
|
|
+ public void testAddRemoveDirectives() throws Exception {
|
|
|
+ proto.addCachePool(new CachePoolInfo("pool1").
|
|
|
+ setMode(new FsPermission((short)0777)));
|
|
|
+ proto.addCachePool(new CachePoolInfo("pool2").
|
|
|
+ setMode(new FsPermission((short)0777)));
|
|
|
+ proto.addCachePool(new CachePoolInfo("pool3").
|
|
|
+ setMode(new FsPermission((short)0777)));
|
|
|
+ proto.addCachePool(new CachePoolInfo("pool4").
|
|
|
+ setMode(new FsPermission((short)0)));
|
|
|
+
|
|
|
+ PathBasedCacheDirective alpha = new PathBasedCacheDirective.Builder().
|
|
|
+ setPath(new Path("/alpha")).
|
|
|
+ setPool("pool1").
|
|
|
+ build();
|
|
|
+ PathBasedCacheDirective beta = new PathBasedCacheDirective.Builder().
|
|
|
+ setPath(new Path("/beta")).
|
|
|
+ setPool("pool2").
|
|
|
+ build();
|
|
|
+ PathBasedCacheDirective delta = new PathBasedCacheDirective.Builder().
|
|
|
+ setPath(new Path("/delta")).
|
|
|
+ setPool("pool1").
|
|
|
+ build();
|
|
|
+
|
|
|
+ PathBasedCacheDescriptor alphaD = addAsUnprivileged(alpha);
|
|
|
+ PathBasedCacheDescriptor alphaD2 = addAsUnprivileged(alpha);
|
|
|
+ assertFalse("Expected to get unique descriptors when re-adding an "
|
|
|
+ + "existing PathBasedCacheDirective",
|
|
|
+ alphaD.getEntryId() == alphaD2.getEntryId());
|
|
|
+ PathBasedCacheDescriptor betaD = addAsUnprivileged(beta);
|
|
|
+
|
|
|
+ try {
|
|
|
+ addAsUnprivileged(new PathBasedCacheDirective.Builder().
|
|
|
+ setPath(new Path("/unicorn")).
|
|
|
+ setPool("no_such_pool").
|
|
|
+ build());
|
|
|
+ fail("expected an error when adding to a non-existent pool.");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ assertTrue(ioe instanceof InvalidPoolNameError);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ addAsUnprivileged(new PathBasedCacheDirective.Builder().
|
|
|
+ setPath(new Path("/blackhole")).
|
|
|
+ setPool("pool4").
|
|
|
+ build());
|
|
|
+ fail("expected an error when adding to a pool with " +
|
|
|
+ "mode 0 (no permissions for anyone).");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ assertTrue(ioe instanceof PoolWritePermissionDeniedError);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ addAsUnprivileged(new PathBasedCacheDirective.Builder().
|
|
|
+ setPath(new Path("/illegal:path/")).
|
|
|
+ setPool("pool1").
|
|
|
+ build());
|
|
|
+ fail("expected an error when adding a malformed path " +
|
|
|
+ "to the cache directives.");
|
|
|
+ } catch (IllegalArgumentException e) {
|
|
|
+ // expected
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ addAsUnprivileged(new PathBasedCacheDirective.Builder().
|
|
|
+ setPath(new Path("/emptypoolname")).
|
|
|
+ setReplication((short)1).
|
|
|
+ setPool("").
|
|
|
+ build());
|
|
|
+ Assert.fail("expected an error when adding a PathBasedCache " +
|
|
|
+ "directive with an empty pool name.");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ Assert.assertTrue(ioe instanceof InvalidPoolNameError);
|
|
|
+ }
|
|
|
+
|
|
|
+ PathBasedCacheDescriptor deltaD = addAsUnprivileged(delta);
|
|
|
+
|
|
|
+ // We expect the following to succeed, because DistributedFileSystem
|
|
|
+ // qualifies the path.
|
|
|
+ PathBasedCacheDescriptor relativeD = addAsUnprivileged(
|
|
|
+ new PathBasedCacheDirective.Builder().
|
|
|
+ setPath(new Path("relative")).
|
|
|
+ setPool("pool1").
|
|
|
+ build());
|
|
|
+
|
|
|
+ RemoteIterator<PathBasedCacheDescriptor> iter;
|
|
|
+ iter = dfs.listPathBasedCacheDescriptors(null, null);
|
|
|
+ validateListAll(iter, alphaD, alphaD2, betaD, deltaD, relativeD);
|
|
|
+ iter = dfs.listPathBasedCacheDescriptors("pool3", null);
|
|
|
+ Assert.assertFalse(iter.hasNext());
|
|
|
+ iter = dfs.listPathBasedCacheDescriptors("pool1", null);
|
|
|
+ validateListAll(iter, alphaD, alphaD2, deltaD, relativeD);
|
|
|
+ iter = dfs.listPathBasedCacheDescriptors("pool2", null);
|
|
|
+ validateListAll(iter, betaD);
|
|
|
+
|
|
|
+ dfs.removePathBasedCacheDescriptor(betaD);
|
|
|
+ iter = dfs.listPathBasedCacheDescriptors("pool2", null);
|
|
|
+ Assert.assertFalse(iter.hasNext());
|
|
|
+
|
|
|
+ try {
|
|
|
+ dfs.removePathBasedCacheDescriptor(betaD);
|
|
|
+ Assert.fail("expected an error when removing a non-existent ID");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ Assert.assertTrue(ioe instanceof NoSuchIdException);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ proto.removePathBasedCacheDescriptor(-42l);
|
|
|
+ Assert.fail("expected an error when removing a negative ID");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ Assert.assertTrue(ioe instanceof InvalidIdException);
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ proto.removePathBasedCacheDescriptor(43l);
|
|
|
+ Assert.fail("expected an error when removing a non-existent ID");
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ Assert.assertTrue(ioe instanceof NoSuchIdException);
|
|
|
+ }
|
|
|
+
|
|
|
+ dfs.removePathBasedCacheDescriptor(alphaD);
|
|
|
+ dfs.removePathBasedCacheDescriptor(alphaD2);
|
|
|
+ dfs.removePathBasedCacheDescriptor(deltaD);
|
|
|
+ dfs.removePathBasedCacheDescriptor(relativeD);
|
|
|
+ iter = dfs.listPathBasedCacheDescriptors(null, null);
|
|
|
+ assertFalse(iter.hasNext());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout=60000)
|
|
|
+ public void testCacheManagerRestart() throws Exception {
|
|
|
+ cluster.shutdown();
|
|
|
+ cluster = null;
|
|
|
+ HdfsConfiguration conf = createCachingConf();
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
|
|
+
|
|
|
+ cluster.waitActive();
|
|
|
+ DistributedFileSystem dfs = cluster.getFileSystem();
|
|
|
+
|
|
|
+ // Create and validate a pool
|
|
|
+ final String pool = "poolparty";
|
|
|
+ String groupName = "partygroup";
|
|
|
+ FsPermission mode = new FsPermission((short)0777);
|
|
|
+ int weight = 747;
|
|
|
+ dfs.addCachePool(new CachePoolInfo(pool)
|
|
|
+ .setGroupName(groupName)
|
|
|
+ .setMode(mode)
|
|
|
+ .setWeight(weight));
|
|
|
+ RemoteIterator<CachePoolInfo> pit = dfs.listCachePools();
|
|
|
+ assertTrue("No cache pools found", pit.hasNext());
|
|
|
+ CachePoolInfo info = pit.next();
|
|
|
+ assertEquals(pool, info.getPoolName());
|
|
|
+ assertEquals(groupName, info.getGroupName());
|
|
|
+ assertEquals(mode, info.getMode());
|
|
|
+ assertEquals(weight, (int)info.getWeight());
|
|
|
+ assertFalse("Unexpected # of cache pools found", pit.hasNext());
|
|
|
+
|
|
|
+ // Create some cache entries
|
|
|
+ int numEntries = 10;
|
|
|
+ String entryPrefix = "/party-";
|
|
|
+ for (int i=0; i<numEntries; i++) {
|
|
|
+ dfs.addPathBasedCacheDirective(
|
|
|
+ new PathBasedCacheDirective.Builder().
|
|
|
+ setPath(new Path(entryPrefix + i)).setPool(pool).build());
|
|
|
+ }
|
|
|
+ RemoteIterator<PathBasedCacheDescriptor> dit
|
|
|
+ = dfs.listPathBasedCacheDescriptors(null, null);
|
|
|
+ for (int i=0; i<numEntries; i++) {
|
|
|
+ assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
|
|
|
+ PathBasedCacheDescriptor cd = dit.next();
|
|
|
+ assertEquals(i+1, cd.getEntryId());
|
|
|
+ assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
|
|
|
+ assertEquals(pool, cd.getPool());
|
|
|
+ }
|
|
|
+ assertFalse("Unexpected # of cache descriptors found", dit.hasNext());
|
|
|
+
|
|
|
+ // Restart namenode
|
|
|
+ cluster.restartNameNode();
|
|
|
+
|
|
|
+ // Check that state came back up
|
|
|
+ pit = dfs.listCachePools();
|
|
|
+ assertTrue("No cache pools found", pit.hasNext());
|
|
|
+ info = pit.next();
|
|
|
+ assertEquals(pool, info.getPoolName());
|
|
|
+ assertEquals(pool, info.getPoolName());
|
|
|
+ assertEquals(groupName, info.getGroupName());
|
|
|
+ assertEquals(mode, info.getMode());
|
|
|
+ assertEquals(weight, (int)info.getWeight());
|
|
|
+ assertFalse("Unexpected # of cache pools found", pit.hasNext());
|
|
|
+
|
|
|
+ dit = dfs.listPathBasedCacheDescriptors(null, null);
|
|
|
+ for (int i=0; i<numEntries; i++) {
|
|
|
+ assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
|
|
|
+ PathBasedCacheDescriptor cd = dit.next();
|
|
|
+ assertEquals(i+1, cd.getEntryId());
|
|
|
+ assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
|
|
|
+ assertEquals(pool, cd.getPool());
|
|
|
+ }
|
|
|
+ assertFalse("Unexpected # of cache descriptors found", dit.hasNext());
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void waitForCachedBlocks(NameNode nn,
|
|
|
+ final int expectedCachedBlocks, final int expectedCachedReplicas)
|
|
|
+ throws Exception {
|
|
|
+ final FSNamesystem namesystem = nn.getNamesystem();
|
|
|
+ final CacheManager cacheManager = namesystem.getCacheManager();
|
|
|
+ LOG.info("Waiting for " + expectedCachedBlocks + " blocks with " +
|
|
|
+ expectedCachedReplicas + " replicas.");
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ int numCachedBlocks = 0, numCachedReplicas = 0;
|
|
|
+ namesystem.readLock();
|
|
|
+ try {
|
|
|
+ GSet<CachedBlock, CachedBlock> cachedBlocks =
|
|
|
+ cacheManager.getCachedBlocks();
|
|
|
+ if (cachedBlocks != null) {
|
|
|
+ for (Iterator<CachedBlock> iter = cachedBlocks.iterator();
|
|
|
+ iter.hasNext(); ) {
|
|
|
+ CachedBlock cachedBlock = iter.next();
|
|
|
+ numCachedBlocks++;
|
|
|
+ numCachedReplicas += cachedBlock.getDatanodes(Type.CACHED).size();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ namesystem.readUnlock();
|
|
|
+ }
|
|
|
+ if ((numCachedBlocks == expectedCachedBlocks) &&
|
|
|
+ (numCachedReplicas == expectedCachedReplicas)) {
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ LOG.info("cached blocks: have " + numCachedBlocks +
|
|
|
+ " / " + expectedCachedBlocks);
|
|
|
+ LOG.info("cached replicas: have " + numCachedReplicas +
|
|
|
+ " / " + expectedCachedReplicas);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }, 500, 60000);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static final long BLOCK_SIZE = 512;
|
|
|
+ private static final int NUM_DATANODES = 4;
|
|
|
+
|
|
|
+ // Most Linux installs will allow non-root users to lock 64KB.
|
|
|
+ private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return true if we can test DN caching.
|
|
|
+ */
|
|
|
+ private static boolean canTestDatanodeCaching() {
|
|
|
+ if (!NativeIO.isAvailable()) {
|
|
|
+ // Need NativeIO in order to cache blocks on the DN.
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if (NativeIO.getMemlockLimit() < CACHE_CAPACITY) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static HdfsConfiguration createCachingConf() {
|
|
|
+ HdfsConfiguration conf = new HdfsConfiguration();
|
|
|
+ conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
|
|
+ conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY);
|
|
|
+ conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
|
|
+ conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, true);
|
|
|
+ conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
|
|
|
+ conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
|
|
|
+ return conf;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout=120000)
|
|
|
+ public void testWaitForCachedReplicas() throws Exception {
|
|
|
+ Assume.assumeTrue(canTestDatanodeCaching());
|
|
|
+ HdfsConfiguration conf = createCachingConf();
|
|
|
+ FileSystemTestHelper helper = new FileSystemTestHelper();
|
|
|
+ MiniDFSCluster cluster =
|
|
|
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
|
|
|
+
|
|
|
+ try {
|
|
|
+ cluster.waitActive();
|
|
|
+ DistributedFileSystem dfs = cluster.getFileSystem();
|
|
|
+ NameNode namenode = cluster.getNameNode();
|
|
|
+ NamenodeProtocols nnRpc = namenode.getRpcServer();
|
|
|
+ Path rootDir = helper.getDefaultWorkingDirectory(dfs);
|
|
|
+ // Create the pool
|
|
|
+ final String pool = "friendlyPool";
|
|
|
+ nnRpc.addCachePool(new CachePoolInfo("friendlyPool"));
|
|
|
+ // Create some test files
|
|
|
+ final int numFiles = 2;
|
|
|
+ final int numBlocksPerFile = 2;
|
|
|
+ final List<String> paths = new ArrayList<String>(numFiles);
|
|
|
+ for (int i=0; i<numFiles; i++) {
|
|
|
+ Path p = new Path(rootDir, "testCachePaths-" + i);
|
|
|
+ FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
|
|
|
+ (int)BLOCK_SIZE);
|
|
|
+ paths.add(p.toUri().getPath());
|
|
|
+ }
|
|
|
+ // Check the initial statistics at the namenode
|
|
|
+ waitForCachedBlocks(namenode, 0, 0);
|
|
|
+ // Cache and check each path in sequence
|
|
|
+ int expected = 0;
|
|
|
+ for (int i=0; i<numFiles; i++) {
|
|
|
+ PathBasedCacheDirective directive =
|
|
|
+ new PathBasedCacheDirective.Builder().
|
|
|
+ setPath(new Path(paths.get(i))).
|
|
|
+ setPool(pool).
|
|
|
+ build();
|
|
|
+ PathBasedCacheDescriptor descriptor =
|
|
|
+ nnRpc.addPathBasedCacheDirective(directive);
|
|
|
+ assertEquals("Descriptor does not match requested path",
|
|
|
+ new Path(paths.get(i)), descriptor.getPath());
|
|
|
+ assertEquals("Descriptor does not match requested pool", pool,
|
|
|
+ descriptor.getPool());
|
|
|
+ expected += numBlocksPerFile;
|
|
|
+ waitForCachedBlocks(namenode, expected, expected);
|
|
|
+ }
|
|
|
+ // Uncache and check each path in sequence
|
|
|
+ RemoteIterator<PathBasedCacheDescriptor> entries =
|
|
|
+ nnRpc.listPathBasedCacheDescriptors(0, null, null);
|
|
|
+ for (int i=0; i<numFiles; i++) {
|
|
|
+ PathBasedCacheDescriptor descriptor = entries.next();
|
|
|
+ nnRpc.removePathBasedCacheDescriptor(descriptor.getEntryId());
|
|
|
+ expected -= numBlocksPerFile;
|
|
|
+ waitForCachedBlocks(namenode, expected, expected);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout=120000)
|
|
|
+ public void testAddingPathBasedCacheDirectivesWhenCachingIsDisabled()
|
|
|
+ throws Exception {
|
|
|
+ Assume.assumeTrue(canTestDatanodeCaching());
|
|
|
+ HdfsConfiguration conf = createCachingConf();
|
|
|
+ conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, false);
|
|
|
+ MiniDFSCluster cluster =
|
|
|
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
|
|
|
+
|
|
|
+ try {
|
|
|
+ cluster.waitActive();
|
|
|
+ DistributedFileSystem dfs = cluster.getFileSystem();
|
|
|
+ NameNode namenode = cluster.getNameNode();
|
|
|
+ // Create the pool
|
|
|
+ String pool = "pool1";
|
|
|
+ namenode.getRpcServer().addCachePool(new CachePoolInfo(pool));
|
|
|
+ // Create some test files
|
|
|
+ final int numFiles = 2;
|
|
|
+ final int numBlocksPerFile = 2;
|
|
|
+ final List<String> paths = new ArrayList<String>(numFiles);
|
|
|
+ for (int i=0; i<numFiles; i++) {
|
|
|
+ Path p = new Path("/testCachePaths-" + i);
|
|
|
+ FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
|
|
|
+ (int)BLOCK_SIZE);
|
|
|
+ paths.add(p.toUri().getPath());
|
|
|
+ }
|
|
|
+ // Check the initial statistics at the namenode
|
|
|
+ waitForCachedBlocks(namenode, 0, 0);
|
|
|
+ // Cache and check each path in sequence
|
|
|
+ int expected = 0;
|
|
|
+ for (int i=0; i<numFiles; i++) {
|
|
|
+ PathBasedCacheDirective directive =
|
|
|
+ new PathBasedCacheDirective.Builder().
|
|
|
+ setPath(new Path(paths.get(i))).
|
|
|
+ setPool(pool).
|
|
|
+ build();
|
|
|
+ dfs.addPathBasedCacheDirective(directive);
|
|
|
+ waitForCachedBlocks(namenode, expected, 0);
|
|
|
+ }
|
|
|
+ Thread.sleep(20000);
|
|
|
+ waitForCachedBlocks(namenode, expected, 0);
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout=120000)
|
|
|
+ public void testWaitForCachedReplicasInDirectory() throws Exception {
|
|
|
+ Assume.assumeTrue(canTestDatanodeCaching());
|
|
|
+ HdfsConfiguration conf = createCachingConf();
|
|
|
+ MiniDFSCluster cluster =
|
|
|
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
|
|
|
+
|
|
|
+ try {
|
|
|
+ cluster.waitActive();
|
|
|
+ DistributedFileSystem dfs = cluster.getFileSystem();
|
|
|
+ NameNode namenode = cluster.getNameNode();
|
|
|
+ // Create the pool
|
|
|
+ final String pool = "friendlyPool";
|
|
|
+ dfs.addCachePool(new CachePoolInfo(pool));
|
|
|
+ // Create some test files
|
|
|
+ final List<Path> paths = new LinkedList<Path>();
|
|
|
+ paths.add(new Path("/foo/bar"));
|
|
|
+ paths.add(new Path("/foo/baz"));
|
|
|
+ paths.add(new Path("/foo2/bar2"));
|
|
|
+ paths.add(new Path("/foo2/baz2"));
|
|
|
+ dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
|
|
|
+ dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
|
|
|
+ final int numBlocksPerFile = 2;
|
|
|
+ for (Path path : paths) {
|
|
|
+ FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
|
|
|
+ (int)BLOCK_SIZE, (short)3, false);
|
|
|
+ }
|
|
|
+ waitForCachedBlocks(namenode, 0, 0);
|
|
|
+ // cache entire directory
|
|
|
+ PathBasedCacheDescriptor descriptor = dfs.addPathBasedCacheDirective(
|
|
|
+ new PathBasedCacheDirective.Builder().
|
|
|
+ setPath(new Path("/foo")).
|
|
|
+ setReplication((short)2).
|
|
|
+ setPool(pool).
|
|
|
+ build());
|
|
|
+ assertEquals("Descriptor does not match requested pool", pool,
|
|
|
+ descriptor.getPool());
|
|
|
+ waitForCachedBlocks(namenode, 4, 8);
|
|
|
+ // remove and watch numCached go to 0
|
|
|
+ dfs.removePathBasedCacheDescriptor(descriptor);
|
|
|
+ waitForCachedBlocks(namenode, 0, 0);
|
|
|
+ } finally {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+}
|