|
@@ -20,22 +20,14 @@ package org.apache.hadoop.hdfs;
|
|
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
|
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
-import static org.junit.Assert.assertFalse;
|
|
|
-import static org.junit.Assert.assertNull;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
import static org.junit.Assert.fail;
|
|
|
|
|
|
import com.google.common.base.Supplier;
|
|
|
|
|
|
-import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.io.OutputStream;
|
|
|
-import java.io.RandomAccessFile;
|
|
|
import java.net.InetSocketAddress;
|
|
|
-import java.nio.file.FileVisitResult;
|
|
|
-import java.nio.file.Files;
|
|
|
-import java.nio.file.SimpleFileVisitor;
|
|
|
-import java.nio.file.attribute.BasicFileAttributes;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
@@ -50,7 +42,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
-import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
@@ -62,6 +53,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
|
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|
|
+import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
|
@@ -367,7 +359,7 @@ public class TestReplication {
|
|
|
for (int i=0; i<buffer.length; i++) {
|
|
|
buffer[i] = '1';
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
try {
|
|
|
Configuration conf = new HdfsConfiguration();
|
|
|
conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, Integer.toString(numDataNodes));
|
|
@@ -387,41 +379,29 @@ public class TestReplication {
|
|
|
// get first block of the file.
|
|
|
ExtendedBlock block = dfsClient.getNamenode().getBlockLocations(testFile,
|
|
|
0, Long.MAX_VALUE).get(0).getBlock();
|
|
|
-
|
|
|
- cluster.shutdown();
|
|
|
-
|
|
|
- for (int i=0; i<25; i++) {
|
|
|
- buffer[i] = '0';
|
|
|
+
|
|
|
+ List<MaterializedReplica> replicas = new ArrayList<>();
|
|
|
+ for (int dnIndex=0; dnIndex<3; dnIndex++) {
|
|
|
+ replicas.add(cluster.getMaterializedReplica(dnIndex, block));
|
|
|
}
|
|
|
-
|
|
|
+ assertEquals(3, replicas.size());
|
|
|
+
|
|
|
+ cluster.shutdown();
|
|
|
+
|
|
|
int fileCount = 0;
|
|
|
// Choose 3 copies of block file - delete 1 and corrupt the remaining 2
|
|
|
- for (int dnIndex=0; dnIndex<3; dnIndex++) {
|
|
|
- File blockFile = cluster.getBlockFile(dnIndex, block);
|
|
|
- LOG.info("Checking for file " + blockFile);
|
|
|
-
|
|
|
- if (blockFile != null && blockFile.exists()) {
|
|
|
- if (fileCount == 0) {
|
|
|
- LOG.info("Deleting file " + blockFile);
|
|
|
- assertTrue(blockFile.delete());
|
|
|
- } else {
|
|
|
- // corrupt it.
|
|
|
- LOG.info("Corrupting file " + blockFile);
|
|
|
- long len = blockFile.length();
|
|
|
- assertTrue(len > 50);
|
|
|
- RandomAccessFile blockOut = new RandomAccessFile(blockFile, "rw");
|
|
|
- try {
|
|
|
- blockOut.seek(len/3);
|
|
|
- blockOut.write(buffer, 0, 25);
|
|
|
- } finally {
|
|
|
- blockOut.close();
|
|
|
- }
|
|
|
- }
|
|
|
- fileCount++;
|
|
|
+ for (MaterializedReplica replica : replicas) {
|
|
|
+ if (fileCount == 0) {
|
|
|
+ LOG.info("Deleting block " + replica);
|
|
|
+ replica.deleteData();
|
|
|
+ } else {
|
|
|
+ // corrupt it.
|
|
|
+ LOG.info("Corrupting file " + replica);
|
|
|
+ replica.corruptData();
|
|
|
}
|
|
|
+ fileCount++;
|
|
|
}
|
|
|
- assertEquals(3, fileCount);
|
|
|
-
|
|
|
+
|
|
|
/* Start the MiniDFSCluster with more datanodes since once a writeBlock
|
|
|
* to a datanode node fails, same block can not be written to it
|
|
|
* immediately. In our case some replication attempts will fail.
|
|
@@ -530,63 +510,28 @@ public class TestReplication {
|
|
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
|
|
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
|
|
FileSystem fs = cluster.getFileSystem();
|
|
|
- FSDataOutputStream create = fs.create(new Path("/test"));
|
|
|
- fs.setReplication(new Path("/test"), (short) 1);
|
|
|
+ Path filePath = new Path("/test");
|
|
|
+ FSDataOutputStream create = fs.create(filePath);
|
|
|
+ fs.setReplication(filePath, (short) 1);
|
|
|
create.write(new byte[1024]);
|
|
|
create.close();
|
|
|
|
|
|
- List<File> nonParticipatedNodeDirs = new ArrayList<File>();
|
|
|
- File participatedNodeDirs = null;
|
|
|
- for (int i = 0; i < cluster.getDataNodes().size(); i++) {
|
|
|
- File storageDir = cluster.getInstanceStorageDir(i, 0);
|
|
|
- String bpid = cluster.getNamesystem().getBlockPoolId();
|
|
|
- File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
|
|
- if (data_dir.listFiles().length == 0) {
|
|
|
- nonParticipatedNodeDirs.add(data_dir);
|
|
|
- } else {
|
|
|
- assertNull("participatedNodeDirs has already been set.",
|
|
|
- participatedNodeDirs);
|
|
|
- participatedNodeDirs = data_dir;
|
|
|
- }
|
|
|
- }
|
|
|
- assertEquals(2, nonParticipatedNodeDirs.size());
|
|
|
-
|
|
|
- String blockFile = null;
|
|
|
- final List<File> listFiles = new ArrayList<>();
|
|
|
- Files.walkFileTree(participatedNodeDirs.toPath(),
|
|
|
- new SimpleFileVisitor<java.nio.file.Path>() {
|
|
|
- @Override
|
|
|
- public FileVisitResult visitFile(
|
|
|
- java.nio.file.Path file, BasicFileAttributes attrs)
|
|
|
- throws IOException {
|
|
|
- listFiles.add(file.toFile());
|
|
|
- return FileVisitResult.CONTINUE;
|
|
|
- }
|
|
|
- }
|
|
|
- );
|
|
|
- assertFalse(listFiles.isEmpty());
|
|
|
+ ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
|
|
|
int numReplicaCreated = 0;
|
|
|
- for (File file : listFiles) {
|
|
|
- if (file.getName().startsWith(Block.BLOCK_FILE_PREFIX)
|
|
|
- && !file.getName().endsWith("meta")) {
|
|
|
- blockFile = file.getName();
|
|
|
- for (File file1 : nonParticipatedNodeDirs) {
|
|
|
- file1.mkdirs();
|
|
|
- new File(file1, blockFile).createNewFile();
|
|
|
- new File(file1, blockFile + "_1000.meta").createNewFile();
|
|
|
- numReplicaCreated++;
|
|
|
- }
|
|
|
- break;
|
|
|
+ for (final DataNode dn : cluster.getDataNodes()) {
|
|
|
+ if (!dn.getFSDataset().contains(block)) {
|
|
|
+ cluster.getFsDatasetTestUtils(dn).injectCorruptReplica(block);
|
|
|
+ numReplicaCreated++;
|
|
|
}
|
|
|
}
|
|
|
assertEquals(2, numReplicaCreated);
|
|
|
|
|
|
- fs.setReplication(new Path("/test"), (short) 3);
|
|
|
+ fs.setReplication(filePath, (short) 3);
|
|
|
cluster.restartDataNodes(); // Lets detect all DNs about dummy copied
|
|
|
// blocks
|
|
|
cluster.waitActive();
|
|
|
cluster.triggerBlockReports();
|
|
|
- DFSTestUtil.waitReplication(fs, new Path("/test"), (short) 3);
|
|
|
+ DFSTestUtil.waitReplication(fs, filePath, (short) 3);
|
|
|
} finally {
|
|
|
if (cluster != null) {
|
|
|
cluster.shutdown();
|