|
@@ -17,6 +17,7 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs.server.mover;
|
|
|
|
|
|
+import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.net.URI;
|
|
|
import java.util.ArrayList;
|
|
@@ -26,10 +27,12 @@ import java.util.Collections;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
|
|
|
+import com.google.common.base.Joiner;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.commons.logging.impl.Log4JLogger;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.conf.ReconfigurationException;
|
|
|
import org.apache.hadoop.fs.FSDataInputStream;
|
|
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
@@ -55,6 +58,8 @@ import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.log4j.Level;
|
|
@@ -599,6 +604,18 @@ public class TestStorageMover {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void setVolumeFull(DataNode dn, StorageType type) {
|
|
|
+ List<? extends FsVolumeSpi> volumes = dn.getFSDataset().getVolumes();
|
|
|
+ for (int j = 0; j < volumes.size(); ++j) {
|
|
|
+ FsVolumeImpl volume = (FsVolumeImpl) volumes.get(j);
|
|
|
+ if (volume.getStorageType() == type) {
|
|
|
+ LOG.info("setCapacity to 0 for [" + volume.getStorageType() + "]"
|
|
|
+ + volume.getStorageID());
|
|
|
+ volume.setCapacityForTesting(0);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Test DISK is running out of spaces.
|
|
|
*/
|
|
@@ -608,76 +625,51 @@ public class TestStorageMover {
|
|
|
final PathPolicyMap pathPolicyMap = new PathPolicyMap(0);
|
|
|
final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
|
|
|
|
|
|
- final long diskCapacity = (6 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)
|
|
|
- * BLOCK_SIZE;
|
|
|
- final long archiveCapacity = 100 * BLOCK_SIZE;
|
|
|
- final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1,
|
|
|
- diskCapacity, archiveCapacity);
|
|
|
Configuration conf = new Configuration(DEFAULT_CONF);
|
|
|
final ClusterScheme clusterScheme = new ClusterScheme(conf,
|
|
|
- NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities);
|
|
|
+ NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
|
|
|
final MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
|
|
|
|
|
|
try {
|
|
|
test.runBasicTest(false);
|
|
|
|
|
|
- // create hot files with replication 3 until not more spaces.
|
|
|
+ // create 2 hot files with replication 3
|
|
|
final short replication = 3;
|
|
|
- {
|
|
|
- int hotFileCount = 0;
|
|
|
- try {
|
|
|
- for (; ; hotFileCount++) {
|
|
|
- final Path p = new Path(pathPolicyMap.hot, "file" + hotFileCount);
|
|
|
- DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
|
|
|
- waitForAllReplicas(replication, p, test.dfs);
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.info("Expected: hotFileCount=" + hotFileCount, e);
|
|
|
- }
|
|
|
- Assert.assertTrue(hotFileCount >= 1);
|
|
|
- }
|
|
|
-
|
|
|
- // create hot files with replication 1 to use up all remaining spaces.
|
|
|
- {
|
|
|
- int hotFileCount_r1 = 0;
|
|
|
- try {
|
|
|
- for (; ; hotFileCount_r1++) {
|
|
|
- final Path p = new Path(pathPolicyMap.hot, "file_r1_" + hotFileCount_r1);
|
|
|
- DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short) 1, 0L);
|
|
|
- waitForAllReplicas(1, p, test.dfs);
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.info("Expected: hotFileCount_r1=" + hotFileCount_r1, e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- { // test increasing replication. Since DISK is full,
|
|
|
- // new replicas should be stored in ARCHIVE as a fallback storage.
|
|
|
- final Path file0 = new Path(pathPolicyMap.hot, "file0");
|
|
|
- final Replication r = test.getReplication(file0);
|
|
|
- final short newReplication = (short) 5;
|
|
|
- test.dfs.setReplication(file0, newReplication);
|
|
|
- Thread.sleep(10000);
|
|
|
- test.verifyReplication(file0, r.disk, newReplication - r.disk);
|
|
|
- }
|
|
|
-
|
|
|
- { // test creating a cold file and then increase replication
|
|
|
- final Path p = new Path(pathPolicyMap.cold, "foo");
|
|
|
+ for (int i = 0; i < 2; i++) {
|
|
|
+ final Path p = new Path(pathPolicyMap.hot, "file" + i);
|
|
|
DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
|
|
|
- test.verifyReplication(p, 0, replication);
|
|
|
-
|
|
|
- final short newReplication = 5;
|
|
|
- test.dfs.setReplication(p, newReplication);
|
|
|
- Thread.sleep(10000);
|
|
|
- test.verifyReplication(p, 0, newReplication);
|
|
|
+ waitForAllReplicas(replication, p, test.dfs);
|
|
|
}
|
|
|
|
|
|
- { //test move a hot file to warm
|
|
|
- final Path file1 = new Path(pathPolicyMap.hot, "file1");
|
|
|
- test.dfs.rename(file1, pathPolicyMap.warm);
|
|
|
- test.migrate();
|
|
|
- test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId());
|
|
|
+ // set all the DISK volume to full
|
|
|
+ for (DataNode dn : test.cluster.getDataNodes()) {
|
|
|
+ setVolumeFull(dn, StorageType.DISK);
|
|
|
+ DataNodeTestUtils.triggerHeartbeat(dn);
|
|
|
}
|
|
|
+
|
|
|
+ // test increasing replication. Since DISK is full,
|
|
|
+ // new replicas should be stored in ARCHIVE as a fallback storage.
|
|
|
+ final Path file0 = new Path(pathPolicyMap.hot, "file0");
|
|
|
+ final Replication r = test.getReplication(file0);
|
|
|
+ final short newReplication = (short) 5;
|
|
|
+ test.dfs.setReplication(file0, newReplication);
|
|
|
+ Thread.sleep(10000);
|
|
|
+ test.verifyReplication(file0, r.disk, newReplication - r.disk);
|
|
|
+
|
|
|
+ // test creating a cold file and then increase replication
|
|
|
+ final Path p = new Path(pathPolicyMap.cold, "foo");
|
|
|
+ DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
|
|
|
+ test.verifyReplication(p, 0, replication);
|
|
|
+
|
|
|
+ test.dfs.setReplication(p, newReplication);
|
|
|
+ Thread.sleep(10000);
|
|
|
+ test.verifyReplication(p, 0, newReplication);
|
|
|
+
|
|
|
+ //test move a hot file to warm
|
|
|
+ final Path file1 = new Path(pathPolicyMap.hot, "file1");
|
|
|
+ test.dfs.rename(file1, pathPolicyMap.warm);
|
|
|
+ test.migrate();
|
|
|
+ test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId());
|
|
|
} finally {
|
|
|
test.shutdownCluster();
|
|
|
}
|
|
@@ -692,53 +684,31 @@ public class TestStorageMover {
|
|
|
final PathPolicyMap pathPolicyMap = new PathPolicyMap(0);
|
|
|
final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
|
|
|
|
|
|
- final long diskCapacity = 100 * BLOCK_SIZE;
|
|
|
- final long archiveCapacity = (6 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)
|
|
|
- * BLOCK_SIZE;
|
|
|
- final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1,
|
|
|
- diskCapacity, archiveCapacity);
|
|
|
final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
|
|
|
- NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities);
|
|
|
+ NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
|
|
|
final MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
|
|
|
|
|
|
try {
|
|
|
test.runBasicTest(false);
|
|
|
|
|
|
- // create cold files with replication 3 until not more spaces.
|
|
|
+ // create 2 hot files with replication 3
|
|
|
final short replication = 3;
|
|
|
- {
|
|
|
- int coldFileCount = 0;
|
|
|
- try {
|
|
|
- for (; ; coldFileCount++) {
|
|
|
- final Path p = new Path(pathPolicyMap.cold, "file" + coldFileCount);
|
|
|
- DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
|
|
|
- waitForAllReplicas(replication, p, test.dfs);
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.info("Expected: coldFileCount=" + coldFileCount, e);
|
|
|
- }
|
|
|
- Assert.assertTrue(coldFileCount >= 1);
|
|
|
+ for (int i = 0; i < 2; i++) {
|
|
|
+ final Path p = new Path(pathPolicyMap.cold, "file" + i);
|
|
|
+ DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
|
|
|
+ waitForAllReplicas(replication, p, test.dfs);
|
|
|
}
|
|
|
|
|
|
- // create cold files with replication 1 to use up all remaining spaces.
|
|
|
- {
|
|
|
- int coldFileCount_r1 = 0;
|
|
|
- try {
|
|
|
- for (; ; coldFileCount_r1++) {
|
|
|
- final Path p = new Path(pathPolicyMap.cold, "file_r1_" + coldFileCount_r1);
|
|
|
- DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short) 1, 0L);
|
|
|
- waitForAllReplicas(1, p, test.dfs);
|
|
|
- }
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.info("Expected: coldFileCount_r1=" + coldFileCount_r1, e);
|
|
|
- }
|
|
|
+ // set all the ARCHIVE volume to full
|
|
|
+ for (DataNode dn : test.cluster.getDataNodes()) {
|
|
|
+ setVolumeFull(dn, StorageType.ARCHIVE);
|
|
|
+ DataNodeTestUtils.triggerHeartbeat(dn);
|
|
|
}
|
|
|
|
|
|
{ // test increasing replication but new replicas cannot be created
|
|
|
// since no more ARCHIVE space.
|
|
|
final Path file0 = new Path(pathPolicyMap.cold, "file0");
|
|
|
final Replication r = test.getReplication(file0);
|
|
|
- LOG.info("XXX " + file0 + ": replication=" + r);
|
|
|
Assert.assertEquals(0, r.disk);
|
|
|
|
|
|
final short newReplication = (short) 5;
|