|
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.balancer;
|
|
|
|
|
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
|
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
|
|
import static org.apache.hadoop.fs.StorageType.DEFAULT;
|
|
import static org.apache.hadoop.fs.StorageType.DEFAULT;
|
|
-import static org.apache.hadoop.fs.StorageType.RAM_DISK;
|
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY;
|
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
|
@@ -28,22 +27,16 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BALANCER_KERBEROS_PRINCIP
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BALANCER_KEYTAB_ENABLED_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BALANCER_KEYTAB_ENABLED_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BALANCER_KEYTAB_FILE_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BALANCER_KEYTAB_FILE_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED;
|
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
|
|
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
|
|
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
|
|
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
|
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
|
|
-import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
|
|
|
|
|
|
|
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
|
import org.junit.AfterClass;
|
|
import org.junit.AfterClass;
|
|
@@ -56,10 +49,8 @@ import static org.mockito.Mockito.doAnswer;
|
|
|
|
|
|
import java.io.File;
|
|
import java.io.File;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
-import java.io.OutputStream;
|
|
|
|
import java.io.PrintWriter;
|
|
import java.io.PrintWriter;
|
|
import java.net.InetAddress;
|
|
import java.net.InetAddress;
|
|
-import java.net.InetSocketAddress;
|
|
|
|
import java.net.URI;
|
|
import java.net.URI;
|
|
import java.security.PrivilegedExceptionAction;
|
|
import java.security.PrivilegedExceptionAction;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
@@ -85,8 +76,6 @@ import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.StorageType;
|
|
import org.apache.hadoop.fs.StorageType;
|
|
-import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
|
-import org.apache.hadoop.hdfs.DFSClient;
|
|
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
import org.apache.hadoop.hdfs.DFSUtil;
|
|
@@ -106,14 +95,9 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|
import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
|
|
import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
|
|
import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
|
|
import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
|
|
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
|
|
|
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
|
|
|
|
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
|
|
|
|
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
|
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
|
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
|
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
|
@@ -185,7 +169,6 @@ public class TestBalancer {
|
|
static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5%
|
|
static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5%
|
|
static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta
|
|
static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta
|
|
static final int DEFAULT_BLOCK_SIZE = 100;
|
|
static final int DEFAULT_BLOCK_SIZE = 100;
|
|
- static final int DEFAULT_RAM_DISK_BLOCK_SIZE = 5 * 1024 * 1024;
|
|
|
|
private static final Random r = new Random();
|
|
private static final Random r = new Random();
|
|
|
|
|
|
static {
|
|
static {
|
|
@@ -211,20 +194,6 @@ public class TestBalancer {
|
|
conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000);
|
|
conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000);
|
|
}
|
|
}
|
|
|
|
|
|
- static void initConfWithRamDisk(Configuration conf,
|
|
|
|
- long ramDiskCapacity) {
|
|
|
|
- conf.setLong(DFS_BLOCK_SIZE_KEY, DEFAULT_RAM_DISK_BLOCK_SIZE);
|
|
|
|
- conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, ramDiskCapacity);
|
|
|
|
- conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, 3);
|
|
|
|
- conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
|
|
|
- conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
|
|
|
|
- conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
|
|
|
|
- conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000);
|
|
|
|
- LazyPersistTestCase.initCacheManipulator();
|
|
|
|
-
|
|
|
|
- conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
private final ErasureCodingPolicy ecPolicy =
|
|
private final ErasureCodingPolicy ecPolicy =
|
|
StripedFileTestUtil.getDefaultECPolicy();
|
|
StripedFileTestUtil.getDefaultECPolicy();
|
|
private final int dataBlocks = ecPolicy.getNumDataUnits();
|
|
private final int dataBlocks = ecPolicy.getNumDataUnits();
|
|
@@ -485,160 +454,6 @@ public class TestBalancer {
|
|
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0);
|
|
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0);
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Make sure that balancer can't move pinned blocks.
|
|
|
|
- * If specified favoredNodes when create file, blocks will be pinned use
|
|
|
|
- * sticky bit.
|
|
|
|
- * @throws Exception
|
|
|
|
- */
|
|
|
|
- @Test(timeout=100000)
|
|
|
|
- public void testBalancerWithPinnedBlocks() throws Exception {
|
|
|
|
- // This test assumes stick-bit based block pin mechanism available only
|
|
|
|
- // in Linux/Unix. It can be unblocked on Windows when HDFS-7759 is ready to
|
|
|
|
- // provide a different mechanism for Windows.
|
|
|
|
- assumeNotWindows();
|
|
|
|
-
|
|
|
|
- final Configuration conf = new HdfsConfiguration();
|
|
|
|
- initConf(conf);
|
|
|
|
- conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
|
|
|
|
-
|
|
|
|
- long[] capacities = new long[] { CAPACITY, CAPACITY };
|
|
|
|
- String[] hosts = {"host0", "host1"};
|
|
|
|
- String[] racks = { RACK0, RACK1 };
|
|
|
|
- int numOfDatanodes = capacities.length;
|
|
|
|
-
|
|
|
|
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
|
|
|
|
- .hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
|
|
|
|
-
|
|
|
|
- cluster.waitActive();
|
|
|
|
- client = NameNodeProxies.createProxy(conf,
|
|
|
|
- cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
|
|
|
|
-
|
|
|
|
- // fill up the cluster to be 80% full
|
|
|
|
- long totalCapacity = sum(capacities);
|
|
|
|
- long totalUsedSpace = totalCapacity * 8 / 10;
|
|
|
|
- InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
|
|
|
|
- for (int i = 0; i < favoredNodes.length; i++) {
|
|
|
|
- // DFSClient will attempt reverse lookup. In case it resolves
|
|
|
|
- // "127.0.0.1" to "localhost", we manually specify the hostname.
|
|
|
|
- int port = cluster.getDataNodes().get(i).getXferAddress().getPort();
|
|
|
|
- favoredNodes[i] = new InetSocketAddress(hosts[i], port);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
|
|
|
|
- totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE,
|
|
|
|
- (short) numOfDatanodes, 0, false, favoredNodes);
|
|
|
|
-
|
|
|
|
- // start up an empty node with the same capacity
|
|
|
|
- cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 },
|
|
|
|
- new long[] { CAPACITY });
|
|
|
|
-
|
|
|
|
- totalCapacity += CAPACITY;
|
|
|
|
-
|
|
|
|
- // run balancer and validate results
|
|
|
|
- waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
|
|
|
-
|
|
|
|
- // start rebalancing
|
|
|
|
- Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
|
|
|
- int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
|
|
|
|
- assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Verify balancer won't violate the default block placement policy.
|
|
|
|
- * @throws Exception
|
|
|
|
- */
|
|
|
|
- @Test(timeout=100000)
|
|
|
|
- public void testRackPolicyAfterBalance() throws Exception {
|
|
|
|
- final Configuration conf = new HdfsConfiguration();
|
|
|
|
- initConf(conf);
|
|
|
|
- long[] capacities = new long[] { CAPACITY, CAPACITY };
|
|
|
|
- String[] hosts = {"host0", "host1"};
|
|
|
|
- String[] racks = { RACK0, RACK1 };
|
|
|
|
- runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks,
|
|
|
|
- null, CAPACITY, "host2", RACK1, null);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Verify balancer won't violate upgrade domain block placement policy.
|
|
|
|
- * @throws Exception
|
|
|
|
- */
|
|
|
|
- @Test(timeout=100000)
|
|
|
|
- public void testUpgradeDomainPolicyAfterBalance() throws Exception {
|
|
|
|
- final Configuration conf = new HdfsConfiguration();
|
|
|
|
- initConf(conf);
|
|
|
|
- conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
|
|
|
- BlockPlacementPolicyWithUpgradeDomain.class,
|
|
|
|
- BlockPlacementPolicy.class);
|
|
|
|
- long[] capacities = new long[] { CAPACITY, CAPACITY, CAPACITY };
|
|
|
|
- String[] hosts = {"host0", "host1", "host2"};
|
|
|
|
- String[] racks = { RACK0, RACK1, RACK1 };
|
|
|
|
- String[] UDs = { "ud0", "ud1", "ud2" };
|
|
|
|
- runBalancerAndVerifyBlockPlacmentPolicy(conf, capacities, hosts, racks,
|
|
|
|
- UDs, CAPACITY, "host3", RACK2, "ud2");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void runBalancerAndVerifyBlockPlacmentPolicy(Configuration conf,
|
|
|
|
- long[] capacities, String[] hosts, String[] racks, String[] UDs,
|
|
|
|
- long newCapacity, String newHost, String newRack, String newUD)
|
|
|
|
- throws Exception {
|
|
|
|
- int numOfDatanodes = capacities.length;
|
|
|
|
-
|
|
|
|
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
|
|
|
|
- .hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
|
|
|
|
- DatanodeManager dm = cluster.getNamesystem().getBlockManager().
|
|
|
|
- getDatanodeManager();
|
|
|
|
- if (UDs != null) {
|
|
|
|
- for(int i = 0; i < UDs.length; i++) {
|
|
|
|
- DatanodeID datanodeId = cluster.getDataNodes().get(i).getDatanodeId();
|
|
|
|
- dm.getDatanode(datanodeId).setUpgradeDomain(UDs[i]);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- try {
|
|
|
|
- cluster.waitActive();
|
|
|
|
- client = NameNodeProxies.createProxy(conf,
|
|
|
|
- cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
|
|
|
|
-
|
|
|
|
- // fill up the cluster to be 80% full
|
|
|
|
- long totalCapacity = sum(capacities);
|
|
|
|
- long totalUsedSpace = totalCapacity * 8 / 10;
|
|
|
|
-
|
|
|
|
- final long fileSize = totalUsedSpace / numOfDatanodes;
|
|
|
|
- DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
|
|
|
|
- fileSize, DEFAULT_BLOCK_SIZE, (short) numOfDatanodes, 0, false);
|
|
|
|
-
|
|
|
|
- // start up an empty node with the same capacity on the same rack as the
|
|
|
|
- // pinned host.
|
|
|
|
- cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
|
|
|
|
- new String[] { newHost }, new long[] { newCapacity });
|
|
|
|
- if (newUD != null) {
|
|
|
|
- DatanodeID newId = cluster.getDataNodes().get(
|
|
|
|
- numOfDatanodes).getDatanodeId();
|
|
|
|
- dm.getDatanode(newId).setUpgradeDomain(newUD);
|
|
|
|
- }
|
|
|
|
- totalCapacity += newCapacity;
|
|
|
|
-
|
|
|
|
- // run balancer and validate results
|
|
|
|
- waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
|
|
|
-
|
|
|
|
- // start rebalancing
|
|
|
|
- Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
|
|
|
- Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
|
|
|
|
- BlockPlacementPolicy placementPolicy =
|
|
|
|
- cluster.getNamesystem().getBlockManager().getBlockPlacementPolicy();
|
|
|
|
- List<LocatedBlock> locatedBlocks = client.
|
|
|
|
- getBlockLocations(fileName, 0, fileSize).getLocatedBlocks();
|
|
|
|
- for (LocatedBlock locatedBlock : locatedBlocks) {
|
|
|
|
- BlockPlacementStatus status = placementPolicy.verifyBlockPlacement(
|
|
|
|
- locatedBlock.getLocations(), numOfDatanodes);
|
|
|
|
- assertTrue(status.isPlacementPolicySatisfied());
|
|
|
|
- }
|
|
|
|
- } finally {
|
|
|
|
- cluster.shutdown();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Wait until balanced: each datanode gives utilization within
|
|
* Wait until balanced: each datanode gives utilization within
|
|
* BALANCE_ALLOWED_VARIANCE of average
|
|
* BALANCE_ALLOWED_VARIANCE of average
|
|
@@ -1598,144 +1413,6 @@ public class TestBalancer {
|
|
CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
|
|
CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
- @Test(timeout = 100000)
|
|
|
|
- public void testMaxIterationTime() throws Exception {
|
|
|
|
- final Configuration conf = new HdfsConfiguration();
|
|
|
|
- initConf(conf);
|
|
|
|
- int blockSize = 10*1024*1024; // 10MB block size
|
|
|
|
- conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
|
|
|
- conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
|
|
|
|
- // limit the worker thread count of Balancer to have only 1 queue per DN
|
|
|
|
- conf.setInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, 1);
|
|
|
|
- // limit the bandwidth to 4MB per sec to emulate slow block moves
|
|
|
|
- conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
|
|
|
|
- 4 * 1024 * 1024);
|
|
|
|
- // set client socket timeout to have an IN_PROGRESS notification back from
|
|
|
|
- // the DataNode about the copy in every second.
|
|
|
|
- conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 2000L);
|
|
|
|
- // set max iteration time to 2 seconds to timeout before moving any block
|
|
|
|
- conf.setLong(DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, 2000L);
|
|
|
|
- // setup the cluster
|
|
|
|
- final long capacity = 10L * blockSize;
|
|
|
|
- final long[] dnCapacities = new long[] {capacity, capacity};
|
|
|
|
- final short rep = 1;
|
|
|
|
- final long seed = 0xFAFAFA;
|
|
|
|
- cluster = new MiniDFSCluster.Builder(conf)
|
|
|
|
- .numDataNodes(0)
|
|
|
|
- .build();
|
|
|
|
- try {
|
|
|
|
- cluster.getConfiguration(0).setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
|
|
|
- conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
|
|
|
- cluster.startDataNodes(conf, 1, true, null, null, dnCapacities);
|
|
|
|
- cluster.waitClusterUp();
|
|
|
|
- cluster.waitActive();
|
|
|
|
- final Path path = new Path("/testMaxIterationTime.dat");
|
|
|
|
- DistributedFileSystem fs = cluster.getFileSystem();
|
|
|
|
- // fill the DN to 40%
|
|
|
|
- DFSTestUtil.createFile(fs, path, 4L * blockSize, rep, seed);
|
|
|
|
- // start a new DN
|
|
|
|
- cluster.startDataNodes(conf, 1, true, null, null, dnCapacities);
|
|
|
|
- cluster.triggerHeartbeats();
|
|
|
|
- // setup Balancer and run one iteration
|
|
|
|
- List<NameNodeConnector> connectors = Collections.emptyList();
|
|
|
|
- try {
|
|
|
|
- BalancerParameters bParams = BalancerParameters.DEFAULT;
|
|
|
|
- // set maxIdleIterations to 1 for NO_MOVE_PROGRESS to be
|
|
|
|
- // reported when there is no block move
|
|
|
|
- connectors = NameNodeConnector.newNameNodeConnectors(
|
|
|
|
- DFSUtil.getInternalNsRpcUris(conf), Balancer.class.getSimpleName(),
|
|
|
|
- Balancer.BALANCER_ID_PATH, conf, 1);
|
|
|
|
- for (NameNodeConnector nnc : connectors) {
|
|
|
|
- LOG.info("NNC to work on: " + nnc);
|
|
|
|
- Balancer b = new Balancer(nnc, bParams, conf);
|
|
|
|
- Result r = b.runOneIteration();
|
|
|
|
- // Since no block cannot be moved in 2 seconds (i.e.,
|
|
|
|
- // 4MB/s * 2s = 8MB < 10MB), NO_MOVE_PROGRESS will be reported.
|
|
|
|
- // When a block move is not canceled in 2 seconds properly and then
|
|
|
|
- // a block is moved unexpectedly, IN_PROGRESS will be reported.
|
|
|
|
- assertEquals("We expect ExitStatus.NO_MOVE_PROGRESS to be reported.",
|
|
|
|
- ExitStatus.NO_MOVE_PROGRESS, r.getExitStatus());
|
|
|
|
- assertEquals(0, r.getBlocksMoved());
|
|
|
|
- }
|
|
|
|
- } finally {
|
|
|
|
- for (NameNodeConnector nnc : connectors) {
|
|
|
|
- IOUtils.cleanupWithLogger(null, nnc);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- } finally {
|
|
|
|
- cluster.shutdown(true, true);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /*
|
|
|
|
- * Test Balancer with Ram_Disk configured
|
|
|
|
- * One DN has two files on RAM_DISK, other DN has no files on RAM_DISK.
|
|
|
|
- * Then verify that the balancer does not migrate files on RAM_DISK across DN.
|
|
|
|
- */
|
|
|
|
- @Test(timeout=300000)
|
|
|
|
- public void testBalancerWithRamDisk() throws Exception {
|
|
|
|
- final int SEED = 0xFADED;
|
|
|
|
- final short REPL_FACT = 1;
|
|
|
|
- Configuration conf = new Configuration();
|
|
|
|
-
|
|
|
|
- final int defaultRamDiskCapacity = 10;
|
|
|
|
- final long ramDiskStorageLimit =
|
|
|
|
- ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
|
|
|
|
- (DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
|
|
|
|
- final long diskStorageLimit =
|
|
|
|
- ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
|
|
|
|
- (DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
|
|
|
|
-
|
|
|
|
- initConfWithRamDisk(conf, ramDiskStorageLimit);
|
|
|
|
-
|
|
|
|
- cluster = new MiniDFSCluster
|
|
|
|
- .Builder(conf)
|
|
|
|
- .numDataNodes(1)
|
|
|
|
- .storageCapacities(new long[] { ramDiskStorageLimit, diskStorageLimit })
|
|
|
|
- .storageTypes(new StorageType[] { RAM_DISK, DEFAULT })
|
|
|
|
- .build();
|
|
|
|
-
|
|
|
|
- cluster.waitActive();
|
|
|
|
- // Create few files on RAM_DISK
|
|
|
|
- final String METHOD_NAME = GenericTestUtils.getMethodName();
|
|
|
|
- final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
|
|
|
- final Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
|
|
|
|
-
|
|
|
|
- DistributedFileSystem fs = cluster.getFileSystem();
|
|
|
|
- DFSClient client = fs.getClient();
|
|
|
|
- DFSTestUtil.createFile(fs, path1, true,
|
|
|
|
- DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE,
|
|
|
|
- DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
|
|
|
|
- DFSTestUtil.createFile(fs, path2, true,
|
|
|
|
- DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE,
|
|
|
|
- DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
|
|
|
|
-
|
|
|
|
- // Sleep for a short time to allow the lazy writer thread to do its job
|
|
|
|
- Thread.sleep(6 * 1000);
|
|
|
|
-
|
|
|
|
- // Add another fresh DN with the same type/capacity without files on RAM_DISK
|
|
|
|
- StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}};
|
|
|
|
- long[][] storageCapacities = new long[][]{{ramDiskStorageLimit,
|
|
|
|
- diskStorageLimit}};
|
|
|
|
- cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null,
|
|
|
|
- null, null, storageCapacities, null, false, false, false, null);
|
|
|
|
-
|
|
|
|
- cluster.triggerHeartbeats();
|
|
|
|
- Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
|
|
|
-
|
|
|
|
- // Run Balancer
|
|
|
|
- final BalancerParameters p = BalancerParameters.DEFAULT;
|
|
|
|
- final int r = Balancer.run(namenodes, p, conf);
|
|
|
|
-
|
|
|
|
- // Validate no RAM_DISK block should be moved
|
|
|
|
- assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
|
|
|
|
-
|
|
|
|
- // Verify files are still on RAM_DISK
|
|
|
|
- DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK);
|
|
|
|
- DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Check that the balancer exits when there is an unfinalized upgrade.
|
|
* Check that the balancer exits when there is an unfinalized upgrade.
|
|
*/
|
|
*/
|
|
@@ -1798,66 +1475,6 @@ public class TestBalancer {
|
|
Balancer.run(namenodes, p, conf));
|
|
Balancer.run(namenodes, p, conf));
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Test special case. Two replicas belong to same block should not in same node.
|
|
|
|
- * We have 2 nodes.
|
|
|
|
- * We have a block in (DN0,SSD) and (DN1,DISK).
|
|
|
|
- * Replica in (DN0,SSD) should not be moved to (DN1,SSD).
|
|
|
|
- * Otherwise DN1 has 2 replicas.
|
|
|
|
- */
|
|
|
|
- @Test(timeout=100000)
|
|
|
|
- public void testTwoReplicaShouldNotInSameDN() throws Exception {
|
|
|
|
- final Configuration conf = new HdfsConfiguration();
|
|
|
|
-
|
|
|
|
- int blockSize = 5 * 1024 * 1024 ;
|
|
|
|
- conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
|
|
|
- conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
|
|
|
|
- conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
|
|
|
|
- 1L);
|
|
|
|
-
|
|
|
|
- conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
|
|
|
|
-
|
|
|
|
- int numOfDatanodes =2;
|
|
|
|
- cluster = new MiniDFSCluster.Builder(conf)
|
|
|
|
- .numDataNodes(2)
|
|
|
|
- .racks(new String[]{"/default/rack0", "/default/rack0"})
|
|
|
|
- .storagesPerDatanode(2)
|
|
|
|
- .storageTypes(new StorageType[][]{
|
|
|
|
- {StorageType.SSD, StorageType.DISK},
|
|
|
|
- {StorageType.SSD, StorageType.DISK}})
|
|
|
|
- .storageCapacities(new long[][]{
|
|
|
|
- {100 * blockSize, 20 * blockSize},
|
|
|
|
- {20 * blockSize, 100 * blockSize}})
|
|
|
|
- .build();
|
|
|
|
- cluster.waitActive();
|
|
|
|
-
|
|
|
|
- //set "/bar" directory with ONE_SSD storage policy.
|
|
|
|
- DistributedFileSystem fs = cluster.getFileSystem();
|
|
|
|
- Path barDir = new Path("/bar");
|
|
|
|
- fs.mkdir(barDir,new FsPermission((short)777));
|
|
|
|
- fs.setStoragePolicy(barDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
|
|
|
|
-
|
|
|
|
- // Insert 30 blocks. So (DN0,SSD) and (DN1,DISK) are about half full,
|
|
|
|
- // and (DN0,SSD) and (DN1,DISK) are about 15% full.
|
|
|
|
- long fileLen = 30 * blockSize;
|
|
|
|
- // fooFile has ONE_SSD policy. So
|
|
|
|
- // (DN0,SSD) and (DN1,DISK) have 2 replicas belong to same block.
|
|
|
|
- // (DN0,DISK) and (DN1,SSD) have 2 replicas belong to same block.
|
|
|
|
- Path fooFile = new Path(barDir, "foo");
|
|
|
|
- createFile(cluster, fooFile, fileLen, (short) numOfDatanodes, 0);
|
|
|
|
- // update space info
|
|
|
|
- cluster.triggerHeartbeats();
|
|
|
|
-
|
|
|
|
- BalancerParameters p = BalancerParameters.DEFAULT;
|
|
|
|
- Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
|
|
|
- final int r = Balancer.run(namenodes, p, conf);
|
|
|
|
-
|
|
|
|
- // Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK)
|
|
|
|
- // already has one. Otherwise DN1 will have 2 replicas.
|
|
|
|
- // For same reason, no replicas were moved.
|
|
|
|
- assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Test running many balancer simultaneously.
|
|
* Test running many balancer simultaneously.
|
|
*
|
|
*
|
|
@@ -1929,121 +1546,6 @@ public class TestBalancer {
|
|
ExitStatus.SUCCESS.getExitCode(), exitCode);
|
|
ExitStatus.SUCCESS.getExitCode(), exitCode);
|
|
}
|
|
}
|
|
|
|
|
|
- /** Balancer should not move blocks with size < minBlockSize. */
|
|
|
|
- @Test(timeout=60000)
|
|
|
|
- public void testMinBlockSizeAndSourceNodes() throws Exception {
|
|
|
|
- final Configuration conf = new HdfsConfiguration();
|
|
|
|
- initConf(conf);
|
|
|
|
-
|
|
|
|
- final short replication = 3;
|
|
|
|
- final long[] lengths = {10, 10, 10, 10};
|
|
|
|
- final long[] capacities = new long[replication];
|
|
|
|
- final long totalUsed = capacities.length * sum(lengths);
|
|
|
|
- Arrays.fill(capacities, 1000);
|
|
|
|
-
|
|
|
|
- cluster = new MiniDFSCluster.Builder(conf)
|
|
|
|
- .numDataNodes(capacities.length)
|
|
|
|
- .simulatedCapacities(capacities)
|
|
|
|
- .build();
|
|
|
|
- final DistributedFileSystem dfs = cluster.getFileSystem();
|
|
|
|
- cluster.waitActive();
|
|
|
|
- client = NameNodeProxies.createProxy(conf, dfs.getUri(),
|
|
|
|
- ClientProtocol.class).getProxy();
|
|
|
|
-
|
|
|
|
- // fill up the cluster to be 80% full
|
|
|
|
- for(int i = 0; i < lengths.length; i++) {
|
|
|
|
- final long size = lengths[i];
|
|
|
|
- final Path p = new Path("/file" + i + "_size" + size);
|
|
|
|
- try(OutputStream out = dfs.create(p)) {
|
|
|
|
- for(int j = 0; j < size; j++) {
|
|
|
|
- out.write(j);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // start up an empty node with the same capacity
|
|
|
|
- cluster.startDataNodes(conf, capacities.length, true, null, null, capacities);
|
|
|
|
- LOG.info("capacities = " + Arrays.toString(capacities));
|
|
|
|
- LOG.info("totalUsedSpace= " + totalUsed);
|
|
|
|
- LOG.info("lengths = " + Arrays.toString(lengths) + ", #=" + lengths.length);
|
|
|
|
- waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, cluster);
|
|
|
|
-
|
|
|
|
- final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
|
|
|
|
-
|
|
|
|
- { // run Balancer with min-block-size=50
|
|
|
|
- final BalancerParameters p = Balancer.Cli.parse(new String[] {
|
|
|
|
- "-policy", BalancingPolicy.Node.INSTANCE.getName(),
|
|
|
|
- "-threshold", "1"
|
|
|
|
- });
|
|
|
|
- assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
|
|
|
|
- assertEquals(p.getThreshold(), 1.0, 0.001);
|
|
|
|
-
|
|
|
|
- conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
|
|
|
|
- final int r = Balancer.run(namenodes, p, conf);
|
|
|
|
- assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
|
|
|
|
-
|
|
|
|
- { // run Balancer with empty nodes as source nodes
|
|
|
|
- final Set<String> sourceNodes = new HashSet<>();
|
|
|
|
- final List<DataNode> datanodes = cluster.getDataNodes();
|
|
|
|
- for(int i = capacities.length; i < datanodes.size(); i++) {
|
|
|
|
- sourceNodes.add(datanodes.get(i).getDisplayName());
|
|
|
|
- }
|
|
|
|
- final BalancerParameters p = Balancer.Cli.parse(new String[] {
|
|
|
|
- "-policy", BalancingPolicy.Node.INSTANCE.getName(),
|
|
|
|
- "-threshold", "1",
|
|
|
|
- "-source", StringUtils.join(sourceNodes, ',')
|
|
|
|
- });
|
|
|
|
- assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
|
|
|
|
- assertEquals(p.getThreshold(), 1.0, 0.001);
|
|
|
|
- assertEquals(p.getSourceNodes(), sourceNodes);
|
|
|
|
-
|
|
|
|
- conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
|
|
|
|
- final int r = Balancer.run(namenodes, p, conf);
|
|
|
|
- assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- { // run Balancer with a filled node as a source node
|
|
|
|
- final Set<String> sourceNodes = new HashSet<>();
|
|
|
|
- final List<DataNode> datanodes = cluster.getDataNodes();
|
|
|
|
- sourceNodes.add(datanodes.get(0).getDisplayName());
|
|
|
|
- final BalancerParameters p = Balancer.Cli.parse(new String[] {
|
|
|
|
- "-policy", BalancingPolicy.Node.INSTANCE.getName(),
|
|
|
|
- "-threshold", "1",
|
|
|
|
- "-source", StringUtils.join(sourceNodes, ',')
|
|
|
|
- });
|
|
|
|
- assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
|
|
|
|
- assertEquals(p.getThreshold(), 1.0, 0.001);
|
|
|
|
- assertEquals(p.getSourceNodes(), sourceNodes);
|
|
|
|
-
|
|
|
|
- conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
|
|
|
|
- final int r = Balancer.run(namenodes, p, conf);
|
|
|
|
- assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- { // run Balancer with all filled node as source nodes
|
|
|
|
- final Set<String> sourceNodes = new HashSet<>();
|
|
|
|
- final List<DataNode> datanodes = cluster.getDataNodes();
|
|
|
|
- for(int i = 0; i < capacities.length; i++) {
|
|
|
|
- sourceNodes.add(datanodes.get(i).getDisplayName());
|
|
|
|
- }
|
|
|
|
- final BalancerParameters p = Balancer.Cli.parse(new String[] {
|
|
|
|
- "-policy", BalancingPolicy.Node.INSTANCE.getName(),
|
|
|
|
- "-threshold", "1",
|
|
|
|
- "-source", StringUtils.join(sourceNodes, ',')
|
|
|
|
- });
|
|
|
|
- assertEquals(p.getBalancingPolicy(), BalancingPolicy.Node.INSTANCE);
|
|
|
|
- assertEquals(p.getThreshold(), 1.0, 0.001);
|
|
|
|
- assertEquals(p.getSourceNodes(), sourceNodes);
|
|
|
|
-
|
|
|
|
- conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
|
|
|
|
- final int r = Balancer.run(namenodes, p, conf);
|
|
|
|
- assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
public void integrationTestWithStripedFile(Configuration conf) throws Exception {
|
|
public void integrationTestWithStripedFile(Configuration conf) throws Exception {
|
|
initConfWithStripe(conf);
|
|
initConfWithStripe(conf);
|
|
doTestBalancerWithStripedFile(conf);
|
|
doTestBalancerWithStripedFile(conf);
|