|
@@ -19,17 +19,12 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|
|
|
|
|
import junit.framework.TestCase;
|
|
|
import java.io.*;
|
|
|
-import java.util.Collection;
|
|
|
import java.util.Iterator;
|
|
|
-import java.util.Random;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
import org.apache.hadoop.fs.permission.*;
|
|
|
|
|
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
|
-import org.apache.hadoop.io.ArrayWritable;
|
|
|
-import org.apache.hadoop.io.UTF8;
|
|
|
-import org.apache.hadoop.io.Writable;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLog.EditLogFileInputStream;
|
|
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
|
|
@@ -39,31 +34,32 @@ import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeFile;
|
|
|
* This class tests the creation and validation of a checkpoint.
|
|
|
*/
|
|
|
public class TestEditLog extends TestCase {
|
|
|
- static final int numDatanodes = 1;
|
|
|
+ static final int NUM_DATA_NODES = 1;
|
|
|
|
|
|
- // This test creates numThreads threads and each thread does
|
|
|
- // 2 * numberTransactions Transactions concurrently.
|
|
|
- int numberTransactions = 100;
|
|
|
- int numThreads = 100;
|
|
|
+ // This test creates NUM_THREADS threads and each thread does
|
|
|
+ // 2 * NUM_TRANSACTIONS Transactions concurrently.
|
|
|
+ static final int NUM_TRANSACTIONS = 100;
|
|
|
+ static final int NUM_THREADS = 100;
|
|
|
|
|
|
//
|
|
|
// an object that does a bunch of transactions
|
|
|
//
|
|
|
static class Transactions implements Runnable {
|
|
|
- FSEditLog editLog;
|
|
|
+ FSNamesystem namesystem;
|
|
|
int numTransactions;
|
|
|
short replication = 3;
|
|
|
long blockSize = 64;
|
|
|
|
|
|
- Transactions(FSEditLog editlog, int num) {
|
|
|
- editLog = editlog;
|
|
|
+ Transactions(FSNamesystem ns, int num) {
|
|
|
+ namesystem = ns;
|
|
|
numTransactions = num;
|
|
|
}
|
|
|
|
|
|
// add a bunch of transactions.
|
|
|
public void run() {
|
|
|
- PermissionStatus p = FSNamesystem.getFSNamesystem(
|
|
|
- ).createFsOwnerPermissions(new FsPermission((short)0777));
|
|
|
+ PermissionStatus p = namesystem.createFsOwnerPermissions(
|
|
|
+ new FsPermission((short)0777));
|
|
|
+ FSEditLog editLog = namesystem.getEditLog();
|
|
|
|
|
|
for (int i = 0; i < numTransactions; i++) {
|
|
|
try {
|
|
@@ -86,74 +82,71 @@ public class TestEditLog extends TestCase {
|
|
|
public void testEditLog() throws IOException {
|
|
|
|
|
|
// start a cluster
|
|
|
-
|
|
|
- Collection<File> namedirs = null;
|
|
|
- Collection<File> editsdirs = null;
|
|
|
Configuration conf = new Configuration();
|
|
|
- MiniDFSCluster cluster = new MiniDFSCluster(0, conf, numDatanodes,
|
|
|
- true, true, null, null);
|
|
|
- cluster.waitActive();
|
|
|
- FileSystem fileSys = cluster.getFileSystem();
|
|
|
- int numdirs = 0;
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
+ FileSystem fileSys = null;
|
|
|
|
|
|
try {
|
|
|
- namedirs = cluster.getNameDirs();
|
|
|
- editsdirs = cluster.getNameEditsDirs();
|
|
|
- } finally {
|
|
|
- fileSys.close();
|
|
|
- cluster.shutdown();
|
|
|
- }
|
|
|
-
|
|
|
- for (Iterator it = namedirs.iterator(); it.hasNext(); ) {
|
|
|
- File dir = (File)it.next();
|
|
|
- System.out.println(dir);
|
|
|
- numdirs++;
|
|
|
- }
|
|
|
-
|
|
|
- FSImage fsimage = new FSImage(namedirs, editsdirs);
|
|
|
- FSEditLog editLog = fsimage.getEditLog();
|
|
|
-
|
|
|
- // set small size of flush buffer
|
|
|
- editLog.setBufferCapacity(2048);
|
|
|
- editLog.close();
|
|
|
- editLog.open();
|
|
|
+ cluster = new MiniDFSCluster(conf, NUM_DATA_NODES, true, null);
|
|
|
+ cluster.waitActive();
|
|
|
+ fileSys = cluster.getFileSystem();
|
|
|
+ final FSNamesystem namesystem = cluster.getNamesystem();
|
|
|
|
|
|
- // Create threads and make them run transactions concurrently.
|
|
|
- Thread threadId[] = new Thread[numThreads];
|
|
|
- for (int i = 0; i < numThreads; i++) {
|
|
|
- Transactions trans = new Transactions(editLog, numberTransactions);
|
|
|
- threadId[i] = new Thread(trans, "TransactionThread-" + i);
|
|
|
- threadId[i].start();
|
|
|
- }
|
|
|
-
|
|
|
- // wait for all transactions to get over
|
|
|
- for (int i = 0; i < numThreads; i++) {
|
|
|
- try {
|
|
|
- threadId[i].join();
|
|
|
- } catch (InterruptedException e) {
|
|
|
- i--; // retry
|
|
|
+ for (Iterator<File> it = cluster.getNameDirs().iterator(); it.hasNext(); ) {
|
|
|
+ File dir = it.next();
|
|
|
+ System.out.println(dir);
|
|
|
}
|
|
|
- }
|
|
|
+
|
|
|
+ FSImage fsimage = namesystem.getFSImage();
|
|
|
+ FSEditLog editLog = fsimage.getEditLog();
|
|
|
+
|
|
|
+ // set small size of flush buffer
|
|
|
+ FSEditLog.setBufferCapacity(2048);
|
|
|
+ editLog.close();
|
|
|
+ editLog.open();
|
|
|
|
|
|
- editLog.close();
|
|
|
-
|
|
|
- // Verify that we can read in all the transactions that we have written.
|
|
|
- // If there were any corruptions, it is likely that the reading in
|
|
|
- // of these transactions will throw an exception.
|
|
|
- //
|
|
|
- for (Iterator<StorageDirectory> it =
|
|
|
- fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
|
|
|
- File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
|
|
|
- System.out.println("Verifying file: " + editFile);
|
|
|
- int numEdits = FSEditLog.loadFSEdits(new EditLogFileInputStream(editFile));
|
|
|
- int numLeases = FSNamesystem.getFSNamesystem().leaseManager.countLease();
|
|
|
- System.out.println("Number of outstanding leases " + numLeases);
|
|
|
- assertEquals(0, numLeases);
|
|
|
- assertTrue("Verification for " + editFile + " failed. " +
|
|
|
- "Expected " + (numThreads * 2 * numberTransactions) + " transactions. "+
|
|
|
- "Found " + numEdits + " transactions.",
|
|
|
- numEdits == numThreads * 2 * numberTransactions);
|
|
|
-
|
|
|
+ // Create threads and make them run transactions concurrently.
|
|
|
+ Thread threadId[] = new Thread[NUM_THREADS];
|
|
|
+ for (int i = 0; i < NUM_THREADS; i++) {
|
|
|
+ Transactions trans = new Transactions(namesystem, NUM_TRANSACTIONS);
|
|
|
+ threadId[i] = new Thread(trans, "TransactionThread-" + i);
|
|
|
+ threadId[i].start();
|
|
|
+ }
|
|
|
+
|
|
|
+ // wait for all transactions to get over
|
|
|
+ for (int i = 0; i < NUM_THREADS; i++) {
|
|
|
+ try {
|
|
|
+ threadId[i].join();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ i--; // retry
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ editLog.close();
|
|
|
+ editLog.open();
|
|
|
+
|
|
|
+ // Verify that we can read in all the transactions that we have written.
|
|
|
+ // If there were any corruptions, it is likely that the reading in
|
|
|
+ // of these transactions will throw an exception.
|
|
|
+ //
|
|
|
+ for (Iterator<StorageDirectory> it =
|
|
|
+ fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
|
|
|
+ File editFile = FSImage.getImageFile(it.next(), NameNodeFile.EDITS);
|
|
|
+ System.out.println("Verifying file: " + editFile);
|
|
|
+ int numEdits = namesystem.getEditLog().loadFSEdits(
|
|
|
+ new EditLogFileInputStream(editFile));
|
|
|
+ int numLeases = namesystem.leaseManager.countLease();
|
|
|
+ System.out.println("Number of outstanding leases " + numLeases);
|
|
|
+ assertEquals(0, numLeases);
|
|
|
+ assertTrue("Verification for " + editFile + " failed. " +
|
|
|
+ "Expected " + (NUM_THREADS * 2 * NUM_TRANSACTIONS) + " transactions. "+
|
|
|
+ "Found " + numEdits + " transactions.",
|
|
|
+ numEdits == NUM_THREADS * 2 * NUM_TRANSACTIONS);
|
|
|
+
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ if(fileSys != null) fileSys.close();
|
|
|
+ if(cluster != null) cluster.shutdown();
|
|
|
}
|
|
|
}
|
|
|
}
|