|
@@ -579,8 +579,8 @@ public class MiniDFSCluster {
|
|
|
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
|
|
|
Configuration dnConf = new HdfsConfiguration(conf);
|
|
|
if (manageDfsDirs) {
|
|
|
- File dir1 = new File(data_dir, "data"+(2*i+1));
|
|
|
- File dir2 = new File(data_dir, "data"+(2*i+2));
|
|
|
+ File dir1 = getStorageDir(i, 0);
|
|
|
+ File dir2 = getStorageDir(i, 1);
|
|
|
dir1.mkdirs();
|
|
|
dir2.mkdirs();
|
|
|
if (!dir1.isDirectory() || !dir2.isDirectory()) {
|
|
@@ -821,40 +821,40 @@ public class MiniDFSCluster {
|
|
|
* Corrupt a block on all datanode
|
|
|
*/
|
|
|
void corruptBlockOnDataNodes(ExtendedBlock block) throws Exception{
|
|
|
- for (int i=0; i < dataNodes.size(); i++)
|
|
|
- corruptBlockOnDataNode(i, block);
|
|
|
+ File[] blockFiles = getAllBlockFiles(block);
|
|
|
+ for (File f : blockFiles) {
|
|
|
+ corruptBlock(f);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
* Corrupt a block on a particular datanode
|
|
|
* Types: delete, write bad data, truncate
|
|
|
*/
|
|
|
- boolean corruptBlockOnDataNode(int i, ExtendedBlock blk) throws Exception {
|
|
|
- Random random = new Random();
|
|
|
- boolean corrupted = false;
|
|
|
- File dataDir = new File(getBaseDirectory() + "data");
|
|
|
- if (i < 0 || i >= dataNodes.size())
|
|
|
+ public static boolean corruptBlockOnDataNode(int i, ExtendedBlock blk)
|
|
|
+ throws IOException {
|
|
|
+ File blockFile = getBlockFile(i, blk);
|
|
|
+ return corruptBlock(blockFile);
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Corrupt a block on a particular datanode
|
|
|
+ */
|
|
|
+ public static boolean corruptBlock(File blockFile) throws IOException {
|
|
|
+ if (blockFile == null || !blockFile.exists()) {
|
|
|
return false;
|
|
|
-
|
|
|
- // TODO:FEDERATION use blockPoolId
|
|
|
- String blockName = blk.getBlockName();
|
|
|
- for (int dn = i*2; dn < i*2+2; dn++) {
|
|
|
- File blockFile = new File(dataDir, "data" + (dn+1) + FINALIZED_DIR_NAME +
|
|
|
- blockName);
|
|
|
- System.out.println("Corrupting for: " + blockFile);
|
|
|
- if (blockFile.exists()) {
|
|
|
- // Corrupt replica by writing random bytes into replica
|
|
|
- RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
|
|
|
- FileChannel channel = raFile.getChannel();
|
|
|
- String badString = "BADBAD";
|
|
|
- int rand = random.nextInt((int)channel.size()/2);
|
|
|
- raFile.seek(rand);
|
|
|
- raFile.write(badString.getBytes());
|
|
|
- raFile.close();
|
|
|
- }
|
|
|
- corrupted = true;
|
|
|
}
|
|
|
- return corrupted;
|
|
|
+ // Corrupt replica by writing random bytes into replica
|
|
|
+ Random random = new Random();
|
|
|
+ RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
|
|
|
+ FileChannel channel = raFile.getChannel();
|
|
|
+ String badString = "BADBAD";
|
|
|
+ int rand = random.nextInt((int)channel.size()/2);
|
|
|
+ raFile.seek(rand);
|
|
|
+ raFile.write(badString.getBytes());
|
|
|
+ raFile.close();
|
|
|
+ LOG.warn("Corrupting the block " + blockFile);
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -1191,4 +1191,86 @@ public class MiniDFSCluster {
|
|
|
public static String getBaseDirectory() {
|
|
|
return System.getProperty("test.build.data", "build/test/data") + "/dfs/";
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get a storage directory for a datanode. There are two storage directories
|
|
|
+ * per datanode:
|
|
|
+ * <ol>
|
|
|
+ * <li><base directory>/data/data<2*dnIndex + 1></li>
|
|
|
+ * <li><base directory>/data/data<2*dnIndex + 2></li>
|
|
|
+ * </ol>
|
|
|
+ *
|
|
|
+ * @param dnIndex datanode index (starts from 0)
|
|
|
+ * @param dirIndex directory index (0 or 1). Index 0 provides access to the
|
|
|
+ * first storage directory. Index 1 provides access to the second
|
|
|
+ * storage directory.
|
|
|
+ * @return Storage directory
|
|
|
+ */
|
|
|
+ public static File getStorageDir(int dnIndex, int dirIndex) {
|
|
|
+ return new File(getBaseDirectory() + "data" + (2*dnIndex + 1 + dirIndex));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get finalized directory for a block pool
|
|
|
+ * @param storageDir storage directory
|
|
|
+ * @param bpid Block pool Id
|
|
|
+ * @return finalized directory for a block pool
|
|
|
+ */
|
|
|
+ public static File getRbwDir(File storageDir, String bpid) {
|
|
|
+ return new File(storageDir, "/current/" + bpid + "/rbw/");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get finalized directory for a block pool
|
|
|
+ * @param storageDir storage directory
|
|
|
+ * @param bpid Block pool Id
|
|
|
+ * @return finalized directory for a block pool
|
|
|
+ */
|
|
|
+ public static File getFinalizedDir(File storageDir, String bpid) {
|
|
|
+ return new File(storageDir, "/current/" + bpid + "/finalized/");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get file correpsonding to a block
|
|
|
+ * @param storageDir storage directory
|
|
|
+ * @param blk block to be corrupted
|
|
|
+ * @return file corresponding to the block
|
|
|
+ */
|
|
|
+ public static File getBlockFile(File storageDir, ExtendedBlock blk) {
|
|
|
+ return new File(getFinalizedDir(storageDir, blk.getPoolId()), blk
|
|
|
+ .getBlockName());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get all files related to a block from all the datanodes
|
|
|
+ * @param block block for which corresponding files are needed
|
|
|
+ */
|
|
|
+ public File[] getAllBlockFiles(ExtendedBlock block) {
|
|
|
+ if (dataNodes.size() == 0) return new File[0];
|
|
|
+ ArrayList<File> list = new ArrayList<File>();
|
|
|
+ for (int i=0; i < dataNodes.size(); i++) {
|
|
|
+ File blockFile = getBlockFile(i, block);
|
|
|
+ if (blockFile != null) {
|
|
|
+ list.add(blockFile);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return list.toArray(new File[list.size()]);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get files related to a block for a given datanode
|
|
|
+ * @param dnIndex Index of the datanode to get block files for
|
|
|
+ * @param block block for which corresponding files are needed
|
|
|
+ */
|
|
|
+ public static File getBlockFile(int dnIndex, ExtendedBlock block) {
|
|
|
+ // Check for block file in the two storage directories of the datanode
|
|
|
+ for (int i = 0; i <=1 ; i++) {
|
|
|
+ File storageDir = MiniDFSCluster.getStorageDir(dnIndex, i);
|
|
|
+ File blockFile = getBlockFile(storageDir, block);
|
|
|
+ if (blockFile.exists()) {
|
|
|
+ return blockFile;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
}
|